我有一个遗留的基于c++的系统,它输出二进制编码的avro数据,支持合流的avro模式注册表格式。在我的java应用程序中,我使用kafkaavrodeserializer类成功地反序列化了消息,但无法打印出消息。
private void consumeAvroData(){
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "http://1.2.3.4:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", LongDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
// props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false");
props.put("schema.registry.url","http://1.2.3.4:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
System.out.println("Subscribed to topic " + TOPIC_NAME);
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records)
{
System.out.printf("value = %s\n",record.value());
}
}
}
我得到的结果是
{"value":"�"}
为什么我不能打印反序列化的数据?感谢您的帮助!
1条答案
按热度按时间kcrjzv8t1#
汇合avro序列化程序的wire格式在标题为“wire格式”的一节中进行了说明
http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
它是一个魔术字节(当前总是0),后跟一个由schema注册表返回的4字节schema id,后跟一组字节,这些字节是avro二进制编码中的avro序列化数据。
如果您将消息作为bytearray读取并打印出前5个字节,您将知道这是否是一个真正合流的avro序列化消息。应为0,后跟0001或其他一些架构id,您可以检查它是否位于此主题的架构注册表中。
如果不是这种格式,那么消息很可能是以另一种方式序列化的(没有合流模式注册表),您需要使用不同的反序列化程序,或者从消息值中提取完整的模式,甚至需要从其他源获取原始的模式文件才能进行解码。