我已经用 kafka-python
将消息写入和读取到的库 kafka
. 我写信息没有任何问题;我可以使用 kafka
控制台工具。但是我不能用我的python脚本读取它们。我在我的消费者上有一个for,它在迭代的第一行冻结,永远不会返回。这是我的密码:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)
for msg in consumer:
print(type(msg))
消费者被完全创建和订阅;我看得出来 my-topic
列在其主题列表中 _client
财产。
你知道吗?
1条答案
按热度按时间8aqjt8rx1#
默认情况下,kafka python从最后一个偏移量开始,ie将只读取新的mesages。一种方法是从头开始读取,或者另一种方法是将轮询主题保持在无限循环中,如下代码所示:
编辑
要从头开始读,我们需要添加
auto_offset_reset=earliest
使消费者对象如果有帮助,请告诉我!!