KafkaConsumer不从主题返回数据

nhaq1z21  于 2023-08-02  发布在  Apache
关注(0)|答案(1)|浏览(116)

我之前从csv文件中填充了几个主题。我想在Jupyter笔记本中获取数据并打印。由于我遇到了麻烦(没有错误,只是无休止的执行),我决定创建另一个主题“numtest”(手动创建),并使用python producer填充它:

producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))

print(str(producer.bootstrap_connected()))

for e in range(100):
data = {'number' : e}
producer.send('numtest', value=data)

print('Done')

字符串
返回“True”和“Done”。然而,当我试图从控制台检查主题时,我得到了这个:root@19f9bec85da8:/opt/Kafka/bin#kafka-console-consumer.sh--topic numtest --bootstrap-server kafka:9092 --from-begin
共处理0条消息

我想,它是空的。
Jupyter中的下一个代码块如下:

consumer = KafkaConsumer('numtest', bootstrap_servers=['kafka:9092'],value_deserializer=lambda x: json.loads(x.decode('utf-8')),auto_offset_reset='earliest')

topics = consumer.topics()
print(topics)

print(consumer.subscription())
data = consumer.poll(max_records = 1)

print(data)


它返回'numtest','water_quality','water_quality_by_water_body'},然后返回'numtest'}。
最后,我没有得到任何数据。没有打印,没有错误。
所有这些都是在Docker中完成的。
我尝试使用我手动创建并从CSV填充的主题。此外,尝试使用KafkaProducer来获得带有int number的简单主题。我想要的结果-主题(如果从CSV文件填充,控制台消费者可以完美读取)被提取到Jupyter notebook中,所以我可以处理数据。如果我知道为什么我的KafkaProducer populated主题(数字从1到100)可以从控制台读取和提取,那就更好了。

oxosxuxt

oxosxuxt1#

生产者缓冲数据,不会立即发送。您需要在打印Done之前刷新它,以确保缓冲区为空
您还需要缩进for循环。否则,您只发送一个事件
对于使用者,如果需要所有消息,并且需要将自动偏移量重置设置为最早偏移量,则应该将轮询作为循环的一部分

相关问题