我一直在尝试将kafkaavro控制台使用者从confluent连接到遗留的kafka集群,该集群是在没有confluent模式注册表的情况下部署的。我使用如下属性显式提供了模式:
kafka-console-consumer --bootstrap-server kafka02.internal:9092 \
--topic test \
--from-beginning \
--property key.schema='{"type":"long"}' \
--property value.schema='{"type":"long"}'
但我得到了“未知的魔法字节!”错误 org.apache.kafka.common.errors.SerializationException
是否可以使用confluent kafka avro控制台使用者来使用来自kafka的avro消息,这些使用者未使用confluent的avroserializer和schema registry序列化?
2条答案
按热度按时间iugsix8n1#
合流模式注册表序列化程序/反序列化程序使用一种wire格式,该格式在消息的初始字节中包含有关模式id等的信息。
如果您的消息没有使用schema registry序列化程序序列化,那么您将无法用它反序列化消息,并将获得
Unknown magic byte!
错误。因此,您需要编写一个消费者来提取消息,使用avroavsc模式进行反序列化,然后假设您希望保留数据,使用模式注册表序列化程序重新序列化它
编辑:我最近写了一篇文章,更深入地解释了这整件事:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained
pepwfjgg2#
kafka-console-consumer
对…一无所知key.schema
或者value.schema
,只有avro生产商才这么做。这里是源代码普通的控制台使用者并不关心数据的格式—它只会打印utf8编码的字节
财产
kafka-avro-console-consumer
只接受schema.registry.url
. 所以,要回答这个问题,是的,它需要使用合流序列化程序进行序列化。