我设置了3个节点confluent/kafka都指向同一个zookeeper
所有3台服务器都已播发.listener=public ipv4纯文本
服务器属性:
broker.id.generation.enable=true
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://PUBLIC-IPv4:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
zookeeper.connect=10.114.16.19:2181
producer&consumer.properties文件仅更改了此值
bootstrap.servers=10.114.16.19:9092,10.114.16.21:9092,10.114.16.20:9092
当我使用节点的公共ip从远程节点运行producer时,它工作得很好,我可以发送消息和创建主题,但是消费者的问题是它在这里没有得到任何消息,这是我使用的代码
制作人.py
producer = KafkaProducer(
bootstrap_servers='PUBLIC-IP:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('slim', {'topic': 'kafka'})
消费者.py
print('Making connection.')
consumer = KafkaConsumer(bootstrap_servers='PUBLIC-IP:9092')
print('Assigning Topic.')
consumer.assign([TopicPartition('slim', 2)])
print('Getting message.')
for message in consumer:
print("OFFSET: " + str(message[0])+ "\t MSG: " + str(message))
当我运行consumerpy客户端时,它只是保持打开状态,没有收到任何消息,只是为了澄清我在网上找到的上面的测试代码,我没有编写它,因为我还在学习kafkaapi
1条答案
按热度按时间wko9yo5t1#
首先确保你能
PUBLIC-IPPUBLIC-IP
你不会陷入dns问题。然后安装kafkacat
如果您在linux平台上,并通过以下命令测试它是否可以使用数据: