如何从原始缓冲区对象获取字符串消息kafka java

cgh8pdjw  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(467)

有一个nodejs kafka producer将文件内容发送给kafka。当Kafka消费者消费了来自Kafka的消息时-

{"type":"Buffer","data":[91,13 .....]

当我使用 m.message.value.toString('utf8') 在nodejs kafka consumer中,它打印实际的消息。但我需要在java中消费kafka消费者。我试过了 property.put("value.serializer.encoding", "utf8") 以及 new String(consumerRecord.value()) 但仍然打印{ "type":"Buffer","data":[91,13 .....] . 我的问题是如何使用nodejs kafka producer生成的java字符串格式的消息。

30byixjq

30byixjq1#

您应该在生产者和消费者api调用中使用适当的键和值序列化程序。
kafka提供了一些默认的键和值序列化程序,如stringserializer等。
用于生成和使用字符串消息的字符串序列化程序。
props.put(“key.serializer”,“org.apache.kafka.common.serialization.stringserializer”);props.put(“value.serializer”,“org.apache.kafka.common.serialization.stringserializer”);
例如,您可以引用生产者和消费者文档。
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/kafkaproducer.htmlhttps用法:/kafka.apache.org/10/javadoc/?org/apache/kafka/clients/consumer/kafkaconsumer.html
schema registry当您从nodejs发送消息并使用kafka consumer使用该消息时,您需要在javakafka consumer中提供相同的消息反序列化器键和值。
有一种方法可以解决这个不兼容的问题,那就是使用schema registry来使用kafka avro存储消息的模式。然后我们可以在nodejs kafka producer和javakafka consumer中使用该模式来使用消息。
以avro为例的nodejs-kafka生产者
下面是一个nodejs kafka avro的例子

javakafka使用者使用模式注册表示例

相关问题