读取avro文件的第一个事件后的eofexception

r1zk6ea1  于 2021-07-15  发布在  Flume
关注(0)|答案(0)|浏览(415)
  1. import java.io.File;
  2. import java.io.FileInputStream;
  3. import java.io.IOException;
  4. import org.apache.avro.Schema;
  5. import org.apache.avro.io.BinaryDecoder;
  6. import org.apache.avro.io.DecoderFactory;
  7. import org.apache.avro.specific.SpecificDatumReader;
  8. import com.quotes.Quotes;
  9. public class BinaryDecoderApp {
  10. public static void main(String[] args) throws IOException {
  11. Schema schema = new Schema.Parser().parse(new File("src/main/resources/lpquotes.avsc"));
  12. File avroFile = new File("src/main/resources/FlumeData.1619451750874");
  13. BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new FileInputStream(avroFile), null);
  14. SpecificDatumReader<Quotes> datumReader = new SpecificDatumReader<>(schema);
  15. while (!decoder.isEnd()) {
  16. Quotes record = datumReader.read(null, decoder);
  17. System.out.println(record);
  18. }
  19. }
  20. }

上面的简单代码就是我用来解码hdfs中flume接收的avro事件的代码。我遇到的问题是,在读取文件的第一个事件(正确打印)之后,我有一个例外:

  1. Exception in thread "main" java.io.EOFException
  2. at org.apache.avro.io.BinaryDecoder.readDouble(BinaryDecoder.java:272)
  3. at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:197)
  4. at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:201)
  5. at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
  6. at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
  7. at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
  8. at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
  9. at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
  10. at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
  11. at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
  12. at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
  13. at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
  14. at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
  15. at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
  16. at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
  17. at com.quotes.reader.BinaryDecoderApp.main(BinaryDecoderApp.java:27)

显然,flume可能会在每个avro事件的末尾放置一个字符,并产生这个异常。
但是我找不到一个方法来正确地阅读它。
Flume配置如下:

  1. tier1.channels = c1
  2. tier1.sources = r1
  3. tier1.sinks = k1
  4. # AvroSource r1
  5. tier1.sources.r1.channels = c1
  6. tier1.sources.r1.type = avro
  7. tier1.sources.r1.bind = 0.0.0.0
  8. tier1.sources.r1.port = 60001
  9. # Channel c1
  10. tier1.channels.c1.type = file
  11. # HdfsSink k1
  12. tier1.sinks.k1.channel=c1
  13. tier1.sinks.k1.type=hdfs
  14. ## HdfsSink k1 sinking properties
  15. tier1.sinks.k1.hdfs.path=/
  16. tier1.sinks.k1.hdfs.fileType = DataStream
  17. tier1.sinks.k1.hdfs.batchSize = 100000
  18. tier1.sinks.k1.hdfs.rollSize = 0
  19. tier1.sinks.k1.hdfs.rollCount = 0
  20. tier1.sinks.k1.hdfs.rollInterval = 60
  21. tier1.sinks.k1.hdfs.threadPoolSize = 500
  22. tier1.sinks.k1.hdfs.callTimeout = 180000
  23. tier1.sinks.k1.hdfs.serializer = avro_event
  24. tier1.sinks.k1.hdfs.writeFormat = Text

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题