我发现我用kafka-consumer-groups.sh工具重置了我的延迟。如何更改主题的起始偏移量?但我需要在应用程序中重置它。我找到了这个例子,但它似乎没有重置它。kafka python在使用者重新启动后读取最后生成的消息示例
consumer = KafkaConsumer("MyTopic", bootstrap_servers=self.kafka_server + ":" + str(self.kafka_port),
enable_auto_commit=False,
group_id="MyTopic.group")
consumer.poll()
consumer.seek_to_end()
consumer.commit()
... continue on with other code...
跑步 bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group MyTopic.group --describe
仍然显示两个分区都有延迟。如何将当前偏移量设置为“快速向前”结束?
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
MyTopic 0 52110 66195 14085 kafka-python-1.4.2-6afb6901-c651-4534-a482-15358db42c22 /Host1 kafka-python-1.4.2
MyTopic 1 52297 66565 14268 kafka-python-1.4.2-c70e0a71-7d61-46a1-97bc-aa2726a8109b /Host2 kafka-python-1.4.2
2条答案
按热度按时间thtygnil1#
为了“快进”消费群体的抵消,意味着要消除滞后,你需要创建新的消费者加入同一个群体。
控制台命令是:
同时,您可以运行命令来查看所描述的延迟,您将看到延迟被清除。
avwztpqn2#
您可能需要: