重新启动kafka python多进程使用者将再次消耗队列中的所有消息

sqserrrh  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(507)

ref:重新启动kafka(python)使用者会再次消耗队列中的所有消息
我对Kafka还不熟悉,我也在努力处理补偿管理问题。
使用最新版本的apache kafka(0.8.1.1.),其中kafka python 0.9.2是从pypi安装的(上次上载于2014-08-27),这与github上当前的主分支不同。
当使用“simpleconsumer”=>进行测试时,崩溃并重新启动脚本会消耗来自最后一个已知偏移量的消息。
使用“multiprocessconsumer”进行测试时=>崩溃并重新启动脚本将从偏移量“0”重新开始使用
我的小脚本(多进程):

from kafka import KafkaClient, MultiProcessConsumer
KFK = KafkaClient("localhost:9092")
consumer = MultiProcessConsumer(KFK, "my-group1", "my-topic", num_procs=2)

我可以通过以下方式检查偏移:

consumer.offsets
{0: 0, 1: 0}

然后,我跑:

A = consumer.get_messages(count=1235)
consumer.offsets
{0: 1235, 1: 0}

再次崩溃并重新启动脚本后,第一次调用“consumer.offsets”返回“{0:1235,1:0}”,这很好。但是跑步:

A.consumer.get_messages(count=388)
consumer.offsets
{0: 388, 1: 0}

你知道怎么处理这个问题吗?此外,是否有任何方法可以正确地更改多进程消费程序偏移量以从定义的位置开始?
谢谢你的帮助。
编辑:在深入了解kafka python lib源代码并检查github上的问题之后,请参阅:https://github.com/mumrah/kafka-python/issues/173
所以问题是,当主多进程使用者启动子进程时,它会在主题的每个分区上将它们的偏移量初始化为“0”(因为子进程的autocommit设置为false),而不是给它们正确的值。
请参阅github上的“mahall”注解。

drkbr07n

drkbr07n1#

这取决于消费者要求Kafka经纪人补偿的方式。很可能您正在java中执行与此等效的操作

readOffset = getLastOffset(consumer,topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

试试这个

readOffset = getLastOffset(consumer,topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);

相关问题