如何在所有消息被消费后关闭Kafka消费者?

aor9mmx1  于 2023-10-15  发布在  Apache
关注(0)|答案(4)|浏览(190)

我有下面的程序来消费所有的消息来Kafka。

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'])
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
consumer.close()

使用上面的程序,我能够消费所有的消息来Kafka。但是一旦所有的消息都被消费了,我想关闭Kafka消费者,但这并没有发生。我也需要帮助。

mm5n2pyu

mm5n2pyu1#

如果我为KafkaConsumer对象提供consumer_timeout_ms参数,我现在就可以关闭Kafka consumer。它接受以毫秒为单位的超时值。下面是代码片段。

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'],
                         consumer_timeout_ms=1000)
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
consumer.close()

在上面的代码中,如果消费者在1秒内没有看到任何消息,它将关闭会话。

hjqgdpho

hjqgdpho2#

Kafka配置参数enable.partition.eof就是您所需要的。当将此配置设置为true时。只要消费者到达分区的末尾,它就会发出PARTITION_NULL事件。所以你可以通过一些回调函数知道何时到达分区的末尾。通过这种方式,您可以选择在到达所有分区的末尾时关闭消费者。

czfnxgou

czfnxgou3#

看起来你需要consumer.close()而不是KafkaConsumer.close()。它没有被记录为静态方法。

zu0ti5jz

zu0ti5jz4#

我认为这里公认的答案并不完全准确,所以这里是我对此的看法:
你可以添加一个条件,如果满足,你可以中断for循环:

for message in consumer:
    if condition:
        break

在您的例子中,您希望在所有消息都被消费时停止,因此您必须找到一种方法来告诉消费者所有消息都已到达。
例如,您可以生成一条消息,其中可能包含该信息,然后您的条件是检查所使用的消息是否是报告所有消息都已到达的消息。
之前提到的另一个例子是假设如果在一定的时间内没有消息到达(这里建议1秒,但至少多几秒可能更好),这意味着没有更多的消息。
我做这件事的方法是检查我收到的所有ID是否至少被考虑过一次(以避免重复),但这需要你确切地知道你收到了什么,以及一些可能超出这个问题范围的逻辑,但我发现这是一个非常有用和优雅的方法来确定如何停止消费,这里是你需要的一些代码:

sum = 0
data = {
    0: None,
    1: None,
    2: None,
    3: None
}
for message in consumer:
    payload = message.value
    unique_id = payload["unique_id"]
    if data[unique_id] is None:
        data[unique_id] = payload
        sum += 1
    if len(data) == sum:
        break

如果你知道你将消耗多少消息,一个更简单的方法是像这样使用enumerate:

amount_of_messages_to_be_consumed = 40 # as an example 40
for index, message in enumerate(consumer):
    if index == amount_of_messages_to_be_consumed:
        break

当然,在你跳出for循环之后,你可以并且应该关闭消费者(但是你可能只是被困在了无尽的for循环中):

consumer.close()

相关问题