带有kafka avro控制台使用者的loggingmessageformatter

aelbi1ox  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(405)

我试图打印一个Kafka主题使用Kafkaavro控制台消费者在log4j格式avro消息。
为此,我使用以下kafka avro console consumer命令:

bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic avro-test -property print.key=true --formatter kafka.tools.LoggingMessageFormatter

我已通过以下命令导出Kafka选项:

export $KAFKA_OPTS= -Dlog4j.configuration=file:/path/to/file/kafka-console-consumer-log4j.properties

现在,如果我运行常规kafka console consumer,使用以下命令:

bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic avro-test -property print.key=true --formatter kafka.tools.LoggingMessageFormatter

我能够生成启用log4j的输出:

[2018-07-17 19:09:40,514] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-07-17 19:09:40,522] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-07-17 19:09:40,523] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] Setting newly assigned partitions [avro-test-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-07-17 19:09:40,531] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] Resetting offset for partition avro-test-0 to offset 23. (org.apache.kafka.clients.consumer.internals.Fetcher)

但是,如果使用以下命令使用avro使用者,则此格式化选项不会生效:

bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic avro-test -property print.key=true --formatter kafka.tools.LoggingMessageFormatter

它只是求助于一个默认的格式化程序。
这里有什么我可能遗漏的吗?

eqqqjvef

eqqqjvef1#

我想如果你推翻了 --formatter ,你将不会再收到avro消息 kafka.tools.LoggingMessageFormatter 不知道如何反序列化avro
参考-源代码

DEFAULT_AVRO_FORMATTER="--formatter io.confluent.kafka.formatter.AvroMessageFormatter"

...

for OPTION in "$@"
do 
  case $OPTION in
    --formatter)
DEFAULT_AVRO_FORMATTER=""

...

exec $(dirname $0)/schema-registry-run-class kafka.tools.ConsoleConsumer $DEFAULT_AVRO_FORMATTER ...

所以,它应该运行 kafka.tools.ConsoleConsumer --formatter kafka.tools.LoggingMessageFormatter ,因为默认值正在取消分配,并且 schema-registry-run-class 正在定义 KAFKA_OPTS ,但该行上不需要空格或美元符号

export KAFKA_OPTS='-Dlog4j.configuration=file:/path/to/file/kafka-console-consumer-log4j.properties'
bin/kafka-avro-console-consumer ...

相关问题