我正在尝试构建一个以Kafka为接口的flask应用程序。我使用了一个python连接器kafkapython和一个docker图像spotify/kafkaproxy。
下面是docker compose文件。
version: '3.3'
services:
kafka:
image: spotify/kafkaproxy
container_name: kafka_dev
ports:
- '9092:9092'
- '2181:2181'
environment:
- ADVERTISED_HOST=0.0.0.0
- ADVERTISED_PORT=9092
- CONSUMER_THREADS=1
- TOPICS=PROFILE_CREATED,IMG_RATED
- ZK_CONNECT=kafka7zookeeper:2181/root/path
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app
depends_on:
- kafka
下面是我用来连接Kafka的python片段。在这里,我使用了Kafka容器的别名 kafka
连接,因为docker负责将别名Map到它的ip地址。
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)
我得到了 NoBrokersAvailable
错误。由此,我可以理解flask应用程序找不到kafka服务器。
Traceback (most recent call last):
File "./app.py", line 11, in <module>
consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
self._client = KafkaClient(metrics=self._metrics,**self.config)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
其他观察结果:
我能跑了 ping kafka
从 flask 容器中取出,从Kafka容器中取出包。
当我在本地运行flask应用程序时,试图通过设置 BOOTSTRAP_SERVERS = ['localhost:9092']
,工作正常。
2条答案
按热度按时间imzjd6km1#
更新
正如cricket\u007所提到的,考虑到您正在使用下面提供的docker compose,您应该使用
kafka:29092
从另一个容器连接到Kafka。所以您的代码如下所示:结束更新
我建议您使用confluent公司的kafka图像,它们有各种使用docker compose的示例设置,可以随时使用,并且它们总是在更新。
试试这个:
我使用了docker-compose.yml并在上面添加了您的服务请注意:
这里使用的配置为到代理的外部连接(即来自docker网络外部的连接)公开端口9092。这可能来自运行docker的主机,如果您有更复杂的设置,也可能来自更远的地方。如果后者为真,则需要将kafka\u播发的\u侦听器中的值“localhost”更改为可从这些远程客户端解析到docker主机的值
请确保查看其他示例,这些示例可能对您很有用,特别是在迁移到生产环境时:https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples
也值得检查:
似乎需要指定api\ U版本以避免此错误。更多详情请点击这里。
这个库的1.3.5版本(pypy上的最新版本)只列出了某些api版本0.8.0到0.10.1。因此,除非您显式地将api\ U版本指定为(0,10,1),否则客户端库尝试发现该版本将导致nobrokersavailable错误。
这应该是可行的,有趣的是,设置api\ U版本会根据以下内容意外地解决问题:
设置api\ U版本时,客户端将不会尝试探测代理以获取版本信息。所以探测操作失败了。版本探测连接和常规连接之间的一个很大区别是,前者只尝试在每个连接(每个代理)的一个接口上进行连接,而后者(常规操作)将在所有接口之间不断循环,直到连接成功#1411通过切换版本探测逻辑以尝试在所有找到的接口上建立连接来修复此问题。
这里描述了实际问题
xtfmy6hx2#
我用一个名为
stream_net
在所有服务之间。从容器外部连接
localhost:9092
网络内的连接kafka:29092
当然,将所有已经在网络中运行的容器放在网络中是很奇怪的。但通过这种方式,容器可以按其实际名称命名。也许有人能准确地解释这是如何工作的,或者它能帮助其他人理解问题的核心并正确地解决问题。