我在笔记本电脑上做本地的事情,并试图从远程服务器“xx”读取一个主题“test”。使用控制台时,我启动zookeeper、kafka,然后启动consumer:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-console-consumer.sh --bootstrap-server xxxxx:9092 --topic test --from-beginning
消息将显示在控制台中。但是当使用python库如下时,我什么也看不到:
from kafka import KafkaConsumer
server = {'server': 'xxxxx:9092', 'topic': 'test'}
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(server['topic'],
group_id='my-group',
bootstrap_servers=server['server'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
我还可以成功地将本地消息从控制台中的kafka发送到python kafka使用者,这个问题只在尝试使用远程消息时发生。另外,与远程服务器的连接似乎已建立良好(它可以看到我),但不幸的是,没有收到任何消息。
1条答案
按热度按时间w7t8yxp51#
我找到的解决方案是使用另一个库,confluent kafka python,这个库只需配置服务器ip和要侦听的主题的名称,就可以开箱即用
编辑:以下是我实施的解决方案:
我以为avro库只是读取avro文件,但它实际上解决了解码kafka消息的问题,如下所示:我首先导入库并将schema文件作为参数,然后创建一个函数将消息解码到字典中,我可以在consumer循环中使用。