通过gcp上的python脚本访问kafka producer服务器

vsikbqxv  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(625)

我在谷歌云平台集群上成功连接了一位Kafka制作者和消费者:

  1. $ cd /usr/lib/kafka
  2. $ bin/kafka-console-producer.sh config/server.properties --broker-list \
  3. PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test

在新的壳中执行

  1. $ cd /usr/lib/kafka
  2. $ bin/kafka-console-consumer.sh --bootstrap-server \
  3. PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test \
  4. --from-beginning

现在,我想使用以下python脚本向kafka producer服务器发送消息:

  1. from kafka import *
  2. topic = 'test'
  3. producer = KafkaProducer(bootstrap_servers='PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092',
  4. api_version=(0,10))
  5. producer.send(topic, b"Test test test")

但是,这会导致 KafkaTimeoutError :

  1. "Failed to update metadata after %.1f secs." % (max_wait,))
  2. kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

在网上四处看看让我考虑:
取消注解 listeners=... 以及 advertised.listeners=.../usr/lib/kafka/config/server.properties 文件。
然而, listeners=PLAINTEXT://:9092 不起作用,此帖子建议设置 PLAINTEXT://<external-ip>:9092 .
因此,我开始考虑通过gcp集群的外部(静态)ip地址访问kafka服务器。然后,我们建立了一个防火墙规则来访问端口(?),并允许https访问集群。但我不确定这是否是对问题的过度处理。
我确实需要一些指导来从python脚本成功地连接到kafka服务器。

dzhpxtsq

dzhpxtsq1#

你需要设置 advertised.listeners 你的客户连接到的地址。
更多信息:https://rmoff.net/2018/08/02/kafka-listeners-explained/

d7v8vwbk

d7v8vwbk2#

谢谢罗宾!你发布的链接非常有助于找到下面的工作配置。
尽管事实上 SimpleProducer 似乎是一种不受欢迎的方法,以下设置最终对我有效:
python脚本:

  1. from kafka import *
  2. topic = 'test'
  3. kafka = KafkaClient('[project-name]-w-0.c.[cluster-id].internal:9092')
  4. producer = SimpleProducer(kafka)
  5. message = "Test"
  6. producer.send_messages(topic, message.encode('utf-8'))

/usr/lib/kafka/config/server.properties 文件:

  1. listeners=PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092
  2. advertised.listeners=PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092

相关问题