我是Kafka的新人。借助kafka python的一些在线教程,我编写了以下代码:
from kafka import SimpleProducer, KafkaClient, KafkaConsumer
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages(b'my-topic', b'this method', b'Hello World')
consumer = KafkaConsumer('my-topic',
group_id='my_group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
但问题是,在最后一个for循环中,代码执行被卡住了,我无法理解。
1条答案
按热度按时间vwhgwdsa1#
你原来的代码是对的。我运行你的代码。kafkaconsumer可以用来消费消息。如果您打开另一个控制台,并运行相同的原始代码,您将看到输出。
在http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html,有很多消费类,比如simpleconsumer、kafkaconsumer、consumer等等。为什么你的代码被困在for循环中?因为代码中的使用者默认设置为使用新消息。在这种情况下,consumer不会使用producer.send_messages()函数生成的消息。
顺便说一下,如果使用simpleconsumer,可以使用seek()设置要使用的消息的偏移量。