如何重置、更新或清除kafka中某个主题的每个消费者组的偏移量?

afdcj2ne  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(883)

我有一个主题,比如test001,假设主题中有10000条消息。我有两个消费组,比如test-group1和test-group2,用于消费来自上述主题的消息。
如果test-group1消费者消费了4000条消息,test-group2消费者消费了4500条消息,那么我该怎么做:
是否将测试组1使用者组的偏移量重置为0?
是否将test-group1消费者组偏移量更新为4500?
是否从主题中删除消息并将所有使用者组的偏移量重置为0?

eh57zj3b

eh57zj3b1#

我不认为你可以重置消费者群体层面的抵消。你可以用 seek 方法(在java客户机api中)移动到分区的开始(偏移量0)、结束或您选择的任何其他偏移量。尝试探索一些cli选项,如kafka-consumer-groups.sh、kafka-topics.sh

a1o7rhls

a1o7rhls2#

偏移量是为每个主题+分区+group.id存储的,而不是整个主题的。您不能从\u consumer-offset主题中删除已提交的偏移量、仅提交较新的偏移量或等待它们过期(默认值为24小时)。
在0.11中,将有一个偏移管理工具,因此您可以独立于消费应用程序更改cli的偏移。

gxwragnw

gxwragnw3#

此票证显示可以使用特殊的“\uu admin\u client”id直接生成到\uu consumer\u offset主题以覆盖偏移:
https://issues.apache.org/jira/browse/kafka-5246
我不熟悉主题消息的格式。这篇文章可能会有所帮助,但你需要做更多的挖掘自己:
http://dayooliyide.com/post/kafka-consumer-offsets-topic/
编写一个给定组id的应用程序,对给定的位置进行搜索并提交偏移量,可能更简单。

相关问题