我有一个关于python中使用kafkapython模块的代码的问题,我真的不知道为什么它不能工作。在我初始化zookeeper&kafka服务器并在我的计算机上创建at主题后,我尝试在我的智能手机上使用termux运行以下代码:
from json import dumps, loads
from kafka import KafkaProducer, KafkaConsumer
class CommunicationLayer:
def __init__(self, node_id):
self.output_publisher = KafkaProducer(bootstrap_servers=['192.168.1.103:9092'], value_serializer=lambda x:
dumps(x).encode('utf-8'))
self.window_receiver = KafkaConsumer("node_{}".format(node_id), bootstrap_servers=['192.168.1.103:9092'] , value_deserializer=lambda x: loads(x.decode('utf-8')))
...
(code that doesn't really help)
...
(ip指向运行kafka服务器的计算机)。只要我运行这段代码,就会得到nobrokeravailable错误,据我所知,只有在您还没有初始化服务器时才会出现。我错过什么了吗?
1条答案
按热度按时间ee7vknir1#
我试着在我的智能手机上运行以下代码
注意:kafka客户端不打算在网络连接不可靠的环境中运行。
ip指向运行kafka服务器的机器
zookeeper和kafka是不是在这台机器上暴露在外面,而不是只在本地听?
同样,您需要设置
advertised.listeners
kafka属性中的属性为任何客户机在引导连接之后能够实际连接的外部地址。