来自python nobrokers的kafka消费者

6vl6ewon  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(293)

我用docker来运行Kafka制作人的命令

kafka-console-producer.sh --topic USER_CREATED_TOPIC --broker-list xxx.xx.x.x:9092`

其中x是来自分配的代理ip的数字。
我的 server.properties 文件包含

advertised.port=9092
advertised.host.name=xxx.xx.x.x.
listeners=PLAINTEXT://xxx.xx.x.x:9092 line
advertised.listeners=PLAINTEXT://xxx.xx.x.x:9092

每当我用命令从docker容器启动消费者时

kafka-console-consumer.sh --topic USER_CREATED_TOPIC --from-beginning --bootstrap-server xxx.xx.x.x:9092

然后在我的producer控制台中写一些东西,我在consumer中得到结果
但是,当我尝试通过python脚本连接时,使用:

from kafka import KafkaConsumer
  consumer = 
  kafkaConsumer("USER_CREATED_TOPIC",bootstrap_servers= 
  ['xxx.xx.x.x:9092'])
for msg in consumer:
     print (msg)

我得到一个 NoBrokersAvailable 错误。
我读了stackoverflow上的几个线程(列出了 server.properties 基于这些答案),但我仍然无法通过python连接到Kafka制作人。
感谢您的帮助。

inn6fuwd

inn6fuwd1#

我看到的唯一语法问题是服务器地址不应该在列表中,因此如下所示:

from kafka import KafkaConsumer
consumer = KafkaConsumer('sample', bootstrap_servers='0.0.0.0:9092')
for message in consumer:
    print(message)

没有运行的集群也会出现这个错误。首先创建一个主题:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sample

然后测试是否存在:

bin/kafka-topics.sh --list --zookeeper localhost:2181

# sample

最后,确保有一个生产者正在创建要收听的消息:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='0.0.0.0:9092')
producer.send('sample', b'Hello, World!')
producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')

相关问题