在kafka python中重置使用者组内的kafka lag(更改偏移量)

yzuktlbb  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(901)

我发现我用kafka-consumer-groups.sh工具重置了我的延迟。如何更改主题的起始偏移量?但我需要在应用程序中重置它。我找到了这个例子,但它似乎没有重置它。kafka python在使用者重新启动后读取最后生成的消息示例

consumer = KafkaConsumer("MyTopic", bootstrap_servers=self.kafka_server + ":" + str(self.kafka_port),
                             enable_auto_commit=False,
                             group_id="MyTopic.group")
    consumer.poll()
    consumer.seek_to_end()
    consumer.commit()

    ... continue on with other code...

跑步 bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group MyTopic.group --describe 仍然显示两个分区都有延迟。如何将当前偏移量设置为“快速向前”结束?

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST             CLIENT-ID
MyTopic         0          52110           66195           14085           kafka-python-1.4.2-6afb6901-c651-4534-a482-15358db42c22 /Host1  kafka-python-1.4.2
MyTopic         1          52297           66565           14268           kafka-python-1.4.2-c70e0a71-7d61-46a1-97bc-aa2726a8109b /Host2  kafka-python-1.4.2
thtygnil

thtygnil1#

为了“快进”消费群体的抵消,意味着要消除滞后,你需要创建新的消费者加入同一个群体。
控制台命令是:

kafka-console-consumer.sh --bootstrap-server <brokerIP>:9092 --topic <topicName> --consumer-property group.id=<groupName>

同时,您可以运行命令来查看所描述的延迟,您将看到延迟被清除。

avwztpqn

avwztpqn2#

您可能需要:

def consumer_from_offset(topic, group_id, offset):
    """return the consumer from a certain offset"""
    consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id=group_id)
    tp = TopicPartition(topic=topic, partition=0)
    consumer.assign([tp])
    consumer.seek(tp, offset)

    return consumer

consumer = consumer_from_offset('topic', 'group', 0)
for msg in consumer:
    # it will consume the msg beginning from offset 0
    print(msg)

相关问题