在kafka中看不到提交给消费者补偿主题的补偿

ybzsozfc  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(323)

我有一个spark流scala应用程序,它从kafka主题读取数据并将其放在hdfs上。我希望应用程序将读取消息的偏移量存储到消费者偏移量主题,以便在应用程序失败时从中开始读取。这个应用运行得很好(我可以看到hdfs上的数据),但是我看不到它提交给消费者的偏移量。
这是我的Kafka帕拉姆:

  1. val kafkaParams = Map(
  2. "metadata.broker.list" -> "xx.xxx.x.xx:6667",
  3. "enable.auto.commit" -> "true",
  4. "group.id" -> "reading_telemetry",
  5. "offsets.storage" -> "kafka"
  6. )

我用来从uu消费者u偏移量中获取提交偏移量的命令如下:

  1. $ /usr/hdp/3.0.0.0-1634/kafka/bin/kafka-console-consumer.sh --consumer.config /tmp/consumer.config --zookeeper xx.xxx.x.xx:2181 --topic __consumer_offsets --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

我得到了一些关于表单的信息
[test1,test,0]::[offsetmetadata[55,no\ metadata],提交时间1539603328309,过期时间6723603328309]
但是我没有看到任何关于“reading\u telemetry”组id的提交。知道吗,为什么?
我的环境:
Kafka:1.0.1Spark:2.3.1斯卡拉:2.11.8

x9ybnkn6

x9ybnkn61#

你不应该直接从书里读 __consumer_offsets 主题。这是一个内部主题,您应该使用工具来检索提交的偏移量。
最简单的方法是运行 kafka-consumer-groups 工具:

  1. kafka-consumer-groups.sh \
  2. --bootstrap-server [BOOTSTRAP_SERVERS] \
  3. --describe \
  4. --group reading_telemetry

CURRENT-OFFSET 包含提交的偏移量。

r1zhe5dt

r1zhe5dt2#

使用kafka-consumer-groups.sh脚本,如下所示:

  1. kafka-consumer-groups.sh --bootstrap-server <BootStrapServerIP:port> --describe --group telemetryGroup

它将以以下格式返回信息:

  1. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
  2. telemetryGroup test-topic 0 15 15 0 telemetryGroup-1/127.0.0.1
  3. telemetryGroup test-topic 1 14 15 1 telemetryGroup-2_/127.0.0.1

相关问题