在kafka connect hdfs中,我们有下面的sequencefilewriter.java类以sequencefileformat编写kafka消息。
import java.io.IOException;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.RecordWriter;
import io.confluent.connect.hdfs.RecordWriterProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.kafka.connect.sink.SinkRecord;
/**
* Provider of a Sequence File record writer.
*/
public class SequenceFileWriterProvider implements RecordWriterProvider
{
public String getExtension() {
return "";
}
@Override
public RecordWriter<SinkRecord> getRecordWriter(Configuration conf, String fileName, SinkRecord record, AvroData avroData) throws IOException {
Path path = new Path(fileName);
final SequenceFile.Writer writer;
SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(path);
SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(LongWritable.class);
SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(Text.class);
SequenceFile.Writer.Option optCodec = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new BZip2Codec());
writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCodec);
return new RecordWriter<SinkRecord>() {
@Override
public void write(SinkRecord record) throws IOException {
writer.append(
new LongWritable(System.currentTimeMillis()),
new Text((byte[]) record.value())
);
}
@Override
public void close() throws IOException {
writer.close();
}
};
}
}
我们在kubernetes管理的docker容器中运行Confluent5.0.0。我们观察到,当我们删除运行kafka连接器的k8s中的复制控制器并重新创建复制控制器时,一些序列文件会损坏。我们有一个spark作业,它使用sequencefilereader读取这些数据并接收一个低于eofexception的异常。还注意到,在文件末尾有两个额外的字节。我们猜测sequencefilewriter有问题,需要帮助验证写入程序。任何帮助都将不胜感激。谢谢。
java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:308)
at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:329)
at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:2160)
at org.apache.hadoop.io.SequenceFile$Reader.seekToCurrentValue(SequenceFile.java:2227)
at org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2263)
at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2394)
at badSequenceFile.readSequenceFile(badSequenceFile.java:27)
at badSequenceFile.main(badSequenceFile.java:345)
注意:当我们在启动k8s复制控制器之前删除连接器临时文件(+tmp)时,连接器将开始清除,并且不会创建坏文件。
1条答案
按热度按时间zpf6vheq1#
修改
writer.append
处理异常看起来已经解决了不使用错误放置的文件结尾(eof)标记写入错误序列文件的问题。此外,还执行了从byte到string数据类型的记录值类型转换。