无法打印kafka avro解码的消息

pjngdqdw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(527)

我有一个遗留的基于c++的系统,它输出二进制编码的avro数据,支持合流的avro模式注册表格式。在我的java应用程序中,我使用kafkaavrodeserializer类成功地反序列化了消息,但无法打印出消息。

private void consumeAvroData(){
    String group = "group1";
    Properties props = new Properties();
    props.put("bootstrap.servers", "http://1.2.3.4:9092");
    props.put("group.id", group);
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", LongDeserializer.class.getName());
    props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
   // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false");
    props.put("schema.registry.url","http://1.2.3.4:8081");
    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);

    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    System.out.println("Subscribed to topic " + TOPIC_NAME);

    while (true) {
        ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
        for (ConsumerRecord<String, GenericRecord> record : records)
        {
            System.out.printf("value = %s\n",record.value());
        }
    }
}

我得到的结果是

{"value":"�"}

为什么我不能打印反序列化的数据?感谢您的帮助!

kcrjzv8t

kcrjzv8t1#

汇合avro序列化程序的wire格式在标题为“wire格式”的一节中进行了说明
http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
它是一个魔术字节(当前总是0),后跟一个由schema注册表返回的4字节schema id,后跟一组字节,这些字节是avro二进制编码中的avro序列化数据。
如果您将消息作为bytearray读取并打印出前5个字节,您将知道这是否是一个真正合流的avro序列化消息。应为0,后跟0001或其他一些架构id,您可以检查它是否位于此主题的架构注册表中。
如果不是这种格式,那么消息很可能是以另一种方式序列化的(没有合流模式注册表),您需要使用不同的反序列化程序,或者从消息值中提取完整的模式,甚至需要从其他源获取原始的模式文件才能进行解码。

相关问题