可以在控制台中使用kafka消息,但不能使用python库?

ndh0cuux  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(387)

我在笔记本电脑上做本地的事情,并试图从远程服务器“xx”读取一个主题“test”。使用控制台时,我启动zookeeper、kafka,然后启动consumer:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-console-consumer.sh --bootstrap-server xxxxx:9092 --topic test --from-beginning

消息将显示在控制台中。但是当使用python库如下时,我什么也看不到:

from kafka import KafkaConsumer

server = {'server': 'xxxxx:9092', 'topic': 'test'}

# To consume latest messages and auto-commit offsets

consumer = KafkaConsumer(server['topic'],
                         group_id='my-group',
                         bootstrap_servers=server['server'])

for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                         message.value))

我还可以成功地将本地消息从控制台中的kafka发送到python kafka使用者,这个问题只在尝试使用远程消息时发生。另外,与远程服务器的连接似乎已建立良好(它可以看到我),但不幸的是,没有收到任何消息。

w7t8yxp5

w7t8yxp51#

我找到的解决方案是使用另一个库,confluent kafka python,这个库只需配置服务器ip和要侦听的主题的名称,就可以开箱即用
编辑:以下是我实施的解决方案:
我以为avro库只是读取avro文件,但它实际上解决了解码kafka消息的问题,如下所示:我首先导入库并将schema文件作为参数,然后创建一个函数将消息解码到字典中,我可以在consumer循环中使用。

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

c = Consumer()
c.subscribe(topic)
running = True
while running:
    msg = c.poll()
    if not msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False

相关问题