我试图打印一个Kafka主题使用Kafkaavro控制台消费者在log4j格式avro消息。
为此,我使用以下kafka avro console consumer命令:
bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic avro-test -property print.key=true --formatter kafka.tools.LoggingMessageFormatter
我已通过以下命令导出Kafka选项:
export $KAFKA_OPTS= -Dlog4j.configuration=file:/path/to/file/kafka-console-consumer-log4j.properties
现在,如果我运行常规kafka console consumer,使用以下命令:
bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic avro-test -property print.key=true --formatter kafka.tools.LoggingMessageFormatter
我能够生成启用log4j的输出:
[2018-07-17 19:09:40,514] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-07-17 19:09:40,522] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-07-17 19:09:40,523] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] Setting newly assigned partitions [avro-test-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-07-17 19:09:40,531] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] Resetting offset for partition avro-test-0 to offset 23. (org.apache.kafka.clients.consumer.internals.Fetcher)
但是,如果使用以下命令使用avro使用者,则此格式化选项不会生效:
bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic avro-test -property print.key=true --formatter kafka.tools.LoggingMessageFormatter
它只是求助于一个默认的格式化程序。
这里有什么我可能遗漏的吗?
1条答案
按热度按时间eqqqjvef1#
我想如果你推翻了
--formatter
,你将不会再收到avro消息kafka.tools.LoggingMessageFormatter
不知道如何反序列化avro参考-源代码
所以,它应该运行
kafka.tools.ConsoleConsumer --formatter kafka.tools.LoggingMessageFormatter
,因为默认值正在取消分配,并且schema-registry-run-class
正在定义KAFKA_OPTS
,但该行上不需要空格或美元符号