读取avro文件的第一个事件后的eofexception

r1zk6ea1  于 2021-07-15  发布在  Flume
关注(0)|答案(0)|浏览(365)
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;

import com.quotes.Quotes;

public class BinaryDecoderApp {
    public static void main(String[] args) throws IOException {
        Schema schema = new Schema.Parser().parse(new File("src/main/resources/lpquotes.avsc"));

        File avroFile = new File("src/main/resources/FlumeData.1619451750874");

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new FileInputStream(avroFile), null);
        SpecificDatumReader<Quotes> datumReader = new SpecificDatumReader<>(schema);

        while (!decoder.isEnd()) {

            Quotes record = datumReader.read(null, decoder);
            System.out.println(record);
        }
    }
}

上面的简单代码就是我用来解码hdfs中flume接收的avro事件的代码。我遇到的问题是,在读取文件的第一个事件(正确打印)之后,我有一个例外:

Exception in thread "main" java.io.EOFException
    at org.apache.avro.io.BinaryDecoder.readDouble(BinaryDecoder.java:272)
    at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:197)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:201)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at com.quotes.reader.BinaryDecoderApp.main(BinaryDecoderApp.java:27)

显然,flume可能会在每个avro事件的末尾放置一个字符,并产生这个异常。
但是我找不到一个方法来正确地阅读它。
Flume配置如下:

tier1.channels = c1
tier1.sources = r1
tier1.sinks = k1

# AvroSource r1

tier1.sources.r1.channels = c1

tier1.sources.r1.type = avro
tier1.sources.r1.bind = 0.0.0.0
tier1.sources.r1.port = 60001

# Channel c1

tier1.channels.c1.type = file

# HdfsSink k1

tier1.sinks.k1.channel=c1
tier1.sinks.k1.type=hdfs

## HdfsSink k1 sinking properties

tier1.sinks.k1.hdfs.path=/
tier1.sinks.k1.hdfs.fileType = DataStream
tier1.sinks.k1.hdfs.batchSize = 100000
tier1.sinks.k1.hdfs.rollSize = 0
tier1.sinks.k1.hdfs.rollCount = 0
tier1.sinks.k1.hdfs.rollInterval = 60
tier1.sinks.k1.hdfs.threadPoolSize = 500
tier1.sinks.k1.hdfs.callTimeout = 180000
tier1.sinks.k1.hdfs.serializer = avro_event
tier1.sinks.k1.hdfs.writeFormat = Text

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题