我有一个spark流scala应用程序,它从kafka主题读取数据并将其放在hdfs上。我希望应用程序将读取消息的偏移量存储到消费者偏移量主题,以便在应用程序失败时从中开始读取。这个应用运行得很好(我可以看到hdfs上的数据),但是我看不到它提交给消费者的偏移量。
这是我的Kafka帕拉姆:
val kafkaParams = Map(
"metadata.broker.list" -> "xx.xxx.x.xx:6667",
"enable.auto.commit" -> "true",
"group.id" -> "reading_telemetry",
"offsets.storage" -> "kafka"
)
我用来从uu消费者u偏移量中获取提交偏移量的命令如下:
$ /usr/hdp/3.0.0.0-1634/kafka/bin/kafka-console-consumer.sh --consumer.config /tmp/consumer.config --zookeeper xx.xxx.x.xx:2181 --topic __consumer_offsets --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
我得到了一些关于表单的信息
[test1,test,0]::[offsetmetadata[55,no\ metadata],提交时间1539603328309,过期时间6723603328309]
但是我没有看到任何关于“reading\u telemetry”组id的提交。知道吗,为什么?
我的环境:
Kafka:1.0.1Spark:2.3.1斯卡拉:2.11.8
2条答案
按热度按时间x9ybnkn61#
你不应该直接从书里读
__consumer_offsets
主题。这是一个内部主题,您应该使用工具来检索提交的偏移量。最简单的方法是运行
kafka-consumer-groups
工具:列
CURRENT-OFFSET
包含提交的偏移量。r1zhe5dt2#
使用kafka-consumer-groups.sh脚本,如下所示:
它将以以下格式返回信息: