无法从停靠环境中的flask连接到kafka

ctzwtxfj  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(541)

我正在尝试构建一个以Kafka为接口的flask应用程序。我使用了一个python连接器kafkapython和一个docker图像spotify/kafkaproxy。
下面是docker compose文件。

version: '3.3'
services:
  kafka:
    image: spotify/kafkaproxy
    container_name: kafka_dev
    ports:
      - '9092:9092'
      - '2181:2181'
    environment:
      - ADVERTISED_HOST=0.0.0.0
      - ADVERTISED_PORT=9092
      - CONSUMER_THREADS=1
      - TOPICS=PROFILE_CREATED,IMG_RATED
      - ZK_CONNECT=kafka7zookeeper:2181/root/path
  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - '9000:5000'
    volumes:
      - ./flask-app:/app
    depends_on:
      - kafka

下面是我用来连接Kafka的python片段。在这里,我使用了Kafka容器的别名 kafka 连接,因为docker负责将别名Map到它的ip地址。

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

我得到了 NoBrokersAvailable 错误。由此,我可以理解flask应用程序找不到kafka服务器。

Traceback (most recent call last):
  File "./app.py", line 11, in <module>
    consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
    self._client = KafkaClient(metrics=self._metrics,**self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

其他观察结果:
我能跑了 ping kafka 从 flask 容器中取出,从Kafka容器中取出包。
当我在本地运行flask应用程序时,试图通过设置 BOOTSTRAP_SERVERS = ['localhost:9092'] ,工作正常。

imzjd6km

imzjd6km1#

更新
正如cricket\u007所提到的,考虑到您正在使用下面提供的docker compose,您应该使用 kafka:29092 从另一个容器连接到Kafka。所以您的代码如下所示:

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

结束更新
我建议您使用confluent公司的kafka图像,它们有各种使用docker compose的示例设置,可以随时使用,并且它们总是在更新。
试试这个:

---
version: '2'
services:
zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
    - zookeeper
    ports:
    - 9092:9092
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
    - '9000:5000'
    volumes:
    - ./flask-app:/app

我使用了docker-compose.yml并在上面添加了您的服务请注意:
这里使用的配置为到代理的外部连接(即来自docker网络外部的连接)公开端口9092。这可能来自运行docker的主机,如果您有更复杂的设置,也可能来自更远的地方。如果后者为真,则需要将kafka\u播发的\u侦听器中的值“localhost”更改为可从这些远程客户端解析到docker主机的值
请确保查看其他示例,这些示例可能对您很有用,特别是在迁移到生产环境时:https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples
也值得检查:
似乎需要指定api\ U版本以避免此错误。更多详情请点击这里。
这个库的1.3.5版本(pypy上的最新版本)只列出了某些api版本0.8.0到0.10.1。因此,除非您显式地将api\ U版本指定为(0,10,1),否则客户端库尝试发现该版本将导致nobrokersavailable错误。

producer = KafkaProducer(
    bootstrap_servers=URL,
    client_id=CLIENT_ID,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)

这应该是可行的,有趣的是,设置api\ U版本会根据以下内容意外地解决问题:
设置api\ U版本时,客户端将不会尝试探测代理以获取版本信息。所以探测操作失败了。版本探测连接和常规连接之间的一个很大区别是,前者只尝试在每个连接(每个代理)的一个接口上进行连接,而后者(常规操作)将在所有接口之间不断循环,直到连接成功#1411通过切换版本探测逻辑以尝试在所有找到的接口上建立连接来修复此问题。
这里描述了实际问题

xtfmy6hx

xtfmy6hx2#

我用一个名为 stream_net 在所有服务之间。


# for local development

version: "3.7"
services:

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - stream_net

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - stream_net

  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - "9000:5000"
    volumes:
      - ./flask-app:/app
    networks:
      - stream_net
    depends_on:
      - kafka

networks:
  stream_net:

从容器外部连接 localhost:9092 网络内的连接 kafka:29092 当然,将所有已经在网络中运行的容器放在网络中是很奇怪的。但通过这种方式,容器可以按其实际名称命名。也许有人能准确地解释这是如何工作的,或者它能帮助其他人理解问题的核心并正确地解决问题。

相关问题