kafka connect hdfs:sequencefilewriter在连接器重新启动时创建坏文件,导致sequencefilereader中出现eofexception

s8vozzvw  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(307)

在kafka connect hdfs中,我们有下面的sequencefilewriter.java类以sequencefileformat编写kafka消息。

import java.io.IOException;

import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.RecordWriter;
import io.confluent.connect.hdfs.RecordWriterProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.kafka.connect.sink.SinkRecord;

/**
 * Provider of a Sequence File record writer.
 */
public class SequenceFileWriterProvider implements RecordWriterProvider
{
  public String getExtension() {
    return "";
  }

  @Override
  public RecordWriter<SinkRecord> getRecordWriter(Configuration conf, String fileName, SinkRecord record, AvroData avroData) throws IOException {
    Path path = new Path(fileName);

    final SequenceFile.Writer writer;
    SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(path);
    SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(LongWritable.class);
    SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(Text.class);
    SequenceFile.Writer.Option optCodec = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new BZip2Codec());
    writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCodec);

    return new RecordWriter<SinkRecord>() {
      @Override
      public void write(SinkRecord record) throws IOException {
        writer.append(
            new LongWritable(System.currentTimeMillis()),
            new Text((byte[]) record.value())
        );
      }

      @Override
      public void close() throws IOException {
        writer.close();
      }
    };
  }
}

我们在kubernetes管理的docker容器中运行Confluent5.0.0。我们观察到,当我们删除运行kafka连接器的k8s中的复制控制器并重新创建复制控制器时,一些序列文件会损坏。我们有一个spark作业,它使用sequencefilereader读取这些数据并接收一个低于eofexception的异常。还注意到,在文件末尾有两个额外的字节。我们猜测sequencefilewriter有问题,需要帮助验证写入程序。任何帮助都将不胜感激。谢谢。

java.io.EOFException
    at java.io.DataInputStream.readByte(DataInputStream.java:267)
    at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:308)
    at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:329)
    at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:2160)
    at org.apache.hadoop.io.SequenceFile$Reader.seekToCurrentValue(SequenceFile.java:2227)
    at org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2263)
    at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2394)
    at badSequenceFile.readSequenceFile(badSequenceFile.java:27)
    at badSequenceFile.main(badSequenceFile.java:345)

注意:当我们在启动k8s复制控制器之前删除连接器临时文件(+tmp)时,连接器将开始清除,并且不会创建坏文件。

zpf6vheq

zpf6vheq1#

修改 writer.append 处理异常看起来已经解决了不使用错误放置的文件结尾(eof)标记写入错误序列文件的问题。此外,还执行了从byte到string数据类型的记录值类型转换。

return new RecordWriter<SinkRecord>() {
  @Override
  public void write(SinkRecord record) {
      if (record != null) {
          byte[] text = (byte[]) record.value();
              try{
                  writer.append(
                          new LongWritable(System.currentTimeMillis()),
                          new Text(new String (text))
                  );

              } catch (Exception e) {
                  logger.error("Exception encounterd : "+e+" for text : "+text);
              }
          }
      }
  }

相关问题