我有一个消费者订阅了一个测试主题,其中生产者线程定期发布。我想能够阻止消费者线程,直到一个新的消息是对-然后处理,并开始再次等待。最近的一次是:
consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
bootstrap_servers=[localhost_],
api_version=(0, 10), consumer_timeout_ms=1000)
while True:
print(consumer.poll(timeout_ms=5000))
有没有更惯用的方法(或者这种方法有什么我看不出来的严重问题)?
新来的Kafka所以对这种设计的一般建议非常欢迎。完整(运行)示例:
import time
from threading import Thread
import kafka
from kafka import KafkaProducer, KafkaConsumer
print('python-kafka:', kafka.__version__)
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(str(key), encoding='utf-8')
value_bytes = bytes(str(value), encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
except Exception as ex:
print('Exception in publishing message\n', ex)
localhost_ = 'localhost:9092'
def kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=[localhost_],
api_version=(0, 10))
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
j = 0
while True:
publish_message(_producer, topic_name, value=j, key=j)
j += 1
time.sleep(5)
if __name__ == '__main__':
print('Running Producer..')
topic_name = 'test'
prod_thread = Thread(target=kafka_producer)
prod_thread.start()
consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
bootstrap_servers=[localhost_],
api_version=(0, 10), consumer_timeout_ms=1000)
# consumer.subscribe([topic_name])
while True:
print(consumer.poll(timeout_ms=5000))
``` `python-kafka: 1.3.5`
1条答案
按热度按时间dz6r00yl1#
在一个无限循环中投票是Kafka的建议:最终指南。下面是第4章的java摘录。Kafka消费者:从Kafka读取数据时使用了相同的想法:
这很好地解释了如何在python中推荐使用这些库。
kafka python(请参阅《两个kafka客户机的故事》中的完整示例)
合流kafka-python(请参阅apachekafka-for-python程序员简介中的完整示例)
另一个密切相关的问题是如何处理这些消息。
这部分代码可能依赖于外部依赖项(数据库、远程服务、网络文件系统等),这可能导致处理尝试失败。
因此,实现一个重试逻辑可能是一个好主意,您可以在ApacheKafka中的博客文章retrying consumer架构中找到一个很好的描述。