我有一个简单的avro模式,通过提供如下默认值,我在avro模式中添加了一个新字段:
{
"name": "channelType",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
使用以下代码反序列化数据。
public class RLoggerKafkaMessageDeSerializer implements Deserializer<RLoggerMessage> {
private final DecoderFactory decoderFactory = DecoderFactory.get();
private final DatumReader<RLoggerMessage> reader;
public RLoggerKafkaMessageDeSerializer() {
super();
reader = new SpecificDatumReader<RLoggerMessage>(RLoggerMessage.getClassSchema());
}
@Override
public RLoggerMessage deserialize(final String topic, final byte[] data) {
try {
BinaryDecoder decoder = decoderFactory.binaryDecoder(data, null);
RLoggerMessage rlogMessage = reader.read(null, decoder);
return rlogMessage;
} catch (Exception e) {
throw new SerializationException("Error deserialize message", e);
}
}
}
然后我得到了以下错误:
java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
注意:我找到了一个解决方案,即将旧模式和新模式添加到specificdataumreader中。请参阅此链接:avro模式不支持向后兼容。这个解决方案对我不起作用。我们的团队正在使用模式注册表。我已经将新架构上载到架构注册表。我的理解是应该选择最新的模式来序列化/反序列化数据。然而,出现了上述例外。
暂无答案!
目前还没有任何答案,快来回答吧!