使用kafka python检索主题中的消息

e0bqpujr  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(409)

我已经用 kafka-python 将消息写入和读取到的库 kafka . 我写信息没有任何问题;我可以使用 kafka 控制台工具。但是我不能用我的python脚本读取它们。我在我的消费者上有一个for,它在迭代的第一行冻结,永远不会返回。这是我的密码:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)

for msg in consumer:
    print(type(msg))

消费者被完全创建和订阅;我看得出来 my-topic 列在其主题列表中 _client 财产。
你知道吗?

8aqjt8rx

8aqjt8rx1#

默认情况下,kafka python从最后一个偏移量开始,ie将只读取新的mesages。一种方法是从头开始读取,或者另一种方法是将轮询主题保持在无限循环中,如下代码所示:

while True:
    try:
        records = consumer.poll(60 * 1000) # timeout in millis , here set to 1 min

        record_list = []
        for tp, consumer_records in records.items():
            for consumer_record in consumer_records:
                record_list.append(consumer_record.value)
        print(record_list) # record_list will be list of dictionaries

编辑

要从头开始读,我们需要添加 auto_offset_reset=earliest 使消费者对象

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8"),
    auto_offset_reset='earliest')

如果有帮助,请告诉我!!

相关问题