我用下面的代码做了一个测试,将数据发送到主题。Kafka是
kafka_2.12-1.1.0
代码是
import kafka
print(kafka.version.__version__)
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['172.25.44.238:9092'],
sasl_mechanism="PLAIN",
api_version=(0, 10),
retries=2
)
f = producer.send("test", "some")
f.get()
如果我这样更改服务器配置:
listeners=PLAINTEXT://172.25.44.238:9092
然后我的代码可以将数据发送到我的主题
如果我像这样更改服务器配置(默认设置):
listeners=PLAINTEXT://:9092
然后我的代码会出错:
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Batch for TopicPartition(topic='test', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time
区别在于sencond默认使用主机名。是的,我的机器运行的生产者代码不能重新爱上Kafka主机名。但我也没有在生产者代码中使用主机名。所以它不应该引起错误。为什么主机名很重要?
1条答案
按热度按时间svdrlsy41#
我认为你误解了“自举”的概念。
您提供的地址仅建立初始连接。客户端实际使用的地址由
advertised.listeners
.这个
listeners
应该一直是://0.0.0.0
,在我看来,那么您可以使用操作系统级别的防火墙设置来限制访问。是的,默认值是主机名,这意味着只有该主机可以与代理通信