如何查看Kafka标题

slsn1g29  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(339)

我们正在使用org.apache.kafka.clients.producer.producerrecord向kafka发送带有标题的消息

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
    this(topic, partition, (Long)null, key, value, headers);
}

如何使用命令查看这些标题。kafka-console-consumer.sh只显示有效负载,不显示标题。

rwqw0loc

rwqw0loc1#

你可以使用优秀的Kafka卡特工具。
示例命令:

kafkacat -b kafka-broker:9092 -t my_topic_name -C \
  -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

样本输出:

Key (-1 bytes):
  Value (13 bytes): {foo:"bar 5"}
  Timestamp: 1548350164096
  Partition: 0
  Offset: 34
  Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
 due to serialization error:

kafkacat头选项仅在的最新版本中可用 kafkacat ; 如果当前版本不包含主分支,您可能希望自己从主分支进行构建。
您也可以从docker运行kafkacat:

docker run --rm edenhill/kafkacat:1.5.0 \
      -b kafka-broker:9092 \
      -t my_topic_name -C \
      -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

如果使用docker,请记住如何联系kafka broker的网络含义。

6za6bjd0

6za6bjd02#

kafka-console-consumer.sh 脚本:

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

src公司:https://github.com/apache/kafka/blob/2.1.1/bin/kafka-console-consumer.sh
kafka.tools.ConsoleConsumer 标头提供给格式化程序,但没有任何现有格式化程序使用它:

formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
                                     msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers),
                                     output)

src公司:https://github.com/apache/kafka/blob/2.1.1/core/src/main/scala/kafka/tools/consoleconsumer.scala
在上面链接的底部,您可以看到现有的格式化程序。
如果你想打印标题,你需要实现你自己的 kafka.common.MessageFormatter 尤其是它的写入方法:

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit

然后使用提供您自己的格式化程序的--formatter运行您的控制台使用者(它也应该出现在类路径上)。
另一种简单快捷的方法是使用kafkaconsumer实现自己的迷你程序,并在debug中检查头文件。

mm5n2pyu

mm5n2pyu3#

你也可以用kafkactl。e、 g.输出为yaml:

kafkactl consume my-topic --print-headers -o yaml

样本输出:

partition: 1
offset: 22
headers:
  key1: value1
  key2: value2
value: my-value

免责声明:我是这个项目的贡献者

相关问题