我正试图找出我目前的高级消费者正在努力抵消哪些影响。我使用kafka 0.8.2.1,在kafka的server.properties中没有设置“offset.storage”,我认为这意味着偏移量存储在kafka中(我还通过检查zk shell中的路径验证了zookeeper中没有存储偏移: /consumers/consumer_group_name/offsets/topic_name/partition_number
)
我试着听听 __consumer_offsets
看看哪个消费者节省了什么价值的补偿,但它没有工作。。。
我尝试了以下方法:
为控制台使用者创建了配置文件,如下所示:
=> more kafka_offset_consumer.config
exclude.internal.topics=false
并尝试了两个版本的控制台使用者脚本:
# 1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181
# 2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config
两者都不起作用-它只是坐在那里,但不打印任何东西,即使消费者积极消费/储蓄抵消。
我是否缺少其他配置/属性?
谢谢!
玛丽娜
5条答案
按热度按时间qc6wkl3g1#
从kafka0.11开始,(scala)源代码可以在这里找到
对于那些需要java翻译的用户,假设您得到了一个
ConsumerRecord<byte[], byte[]> consumerRecord
,您可以使用获取密钥(首先检查密钥是否为空)并使用
GroupMetadataManager.readMessageKey(consumerRecord.key)
. 它可以返回不同的类型,所以请检查if ( ... instanceof OffsetKey)
,然后将其强制转换,可以从中获得各种值。要获取偏移量的kafka记录值,可以使用
String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))
一个从scala代码翻译过来的java示例。。。注意,也可以使用adminclientapi来描述组,而不是使用这些原始消息
listconsumergroupoffsets():查找特定组的所有偏移
descripbeconsumergroups():查找组成员的详细信息
scala源代码提取
gjmwrych2#
如果你加上
--from-beginning
它很可能会给你一些结果,至少在我尝试的时候是这样。或者,如果您没有提供该参数,但在让消费者侦听时读取了更多消息(并触发偏移量提交),那么也应该在那里显示消息。zzwlnbp83#
我在尝试从消费补偿主题中消费时遇到了这个问题。我设法找到了不同版本的Kafka,并认为我会分享我的发现
对于Kafka0.8.2.x
注意:这使用zookeeper连接
对于Kafka0.9.x.x和0.10.x.x
对于0.11.x.x-2.x
fzwojiic4#
对于kafka-2.x,使用以下命令
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"
pvabu6sv5#
好的,我已经知道问题出在哪里了。我的Kafka实际上是使用zookeeper作为偏移存储,而不是Kafka。。。。我没有立即检测到的原因是我检查zk内容不正确:
我在做什么
什么也看不见。相反,我必须“获取”内容,这确实为我的消费者显示了正确的补偿,如下所示: