无法读取Kafka主题avro消息

v6ylcynt  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(518)

debezium连接器的kafka连接事件是avro编码的。
在传递给kafka connect standalone服务的connect-standalone.properties中提到了以下内容。

key.converter=io.confluent.connect.avro.AvroConverter
value.confluent=io.confluent.connect.avro.AvroConverter
internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter
schema.registry.url=http://ip_address:8081
internal.key.converter.schema.registry.url=http://ip_address:8081
internal.value.converter.schema.registry.url=http://ip_address:8081

使用以下属性配置kafka消费代码:

Properties props = new Properties();
props.put("bootstrap.servers", "ip_address:9092");
props.put("zookeeper.connect", "ip_address:2181");
props.put("group.id", "test-consumer-group");
props.put("auto.offset.reset","smallest");
//Setting auto comit to false to ensure that on processing failure we retry the read
props.put("auto.commit.offset", "false");
props.put("key.converter.schema.registry.url", "ip_address:8081");
props.put("value.converter.schema.registry.url", "ip_address:8081");
props.put("schema.registry.url", "ip_address:8081");

在使用者实现中,下面是读取键和值组件的代码。我正在使用rest从schema registry获取key和value的模式。

GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(byteData, null));

解析密钥效果很好。在解析消息的值部分时,我得到了arrayindexoutofboundsexception。
下载了avro的源代码并进行了调试。发现genericdatumreader.readint方法返回负值。这个值应该是数组(符号)的索引,因此应该是正数。
尝试使用kafka avro独立使用者消费事件,但也抛出了arrayindexoutofboundsexception。因此,我猜测消息在kafka connect(producer)中的编码不正确&问题在于配置。
以下是问题:
生产者或消费者传递的配置有什么问题吗?
为什么密钥反序列化工作正常,但没有价值?
还有什么需要做的事情吗(比如在某处指定字符编码)。
使用avro的debezium能用于生产吗,还是目前的实验特性?DebeziumAvro上的帖子特别指出,涉及avro的例子将在未来被包括在内。
有很多帖子说avro反序列化将arrayindexoutofboundsexception抛出,但无法将其与我面临的问题联系起来。

dl5txlt9

dl5txlt91#

遵循中的步骤http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html &现在一切正常。

相关问题