docker kafka w/python消费者

1sbrub3j  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(281)

我正在使用停靠Kafka和编写一个Kafka消费程序。当我在docker中运行kafka并在本地机器上运行应用程序时,它可以完美地工作。但是当我在docker中配置本地应用程序时,我遇到了一些问题。问题可能是由于在应用程序启动之前未创建主题。
docker-compose.yml公司

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  parse-engine:
    build: .
    depends_on:
      - "kafka"
    command: python parse-engine.py
    ports:
     - "5000:5000"

解析引擎.py

from kafka import KafkaConsumer
import json

try:
    print('Welcome to parse engine')
    consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')
    for message in consumer:
        print(message)
except Exception as e:
    print(e)
    # Logs the error appropriately. 
    pass

错误日志

kafka_1         | [2018-09-21 06:27:17,400] INFO [SocketServer brokerId=1001] Started processors for 1 acceptors (kafka.network.SocketServer)
kafka_1         | [2018-09-21 06:27:17,404] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1         | [2018-09-21 06:27:17,404] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1         | [2018-09-21 06:27:17,431] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)

**parse-engine_1  | Welcome to parse engine

parse-engine_1  | NoBrokersAvailable 
parseengine_parse-engine_1 exited with code 0**
kafka_1         | creating topics: test:1:1

因为我已经在docker compose中添加了依赖于属性,但在启动主题应用程序连接之前发生了错误。
我读到,我可以添加在docker撰写文件脚本,但我正在寻找一些简单的方法。
谢谢你的帮助

rhfm7lfc

rhfm7lfc1#

这条线

KAFKA_ADVERTISED_HOST_NAME: localhost

说经纪公司在宣传自己只能在 localhost ,这意味着所有kafka客户机只能返回自己,而不是真正的代理地址列表。如果您的客户机只位于您的主机上,那么这样就可以了—请求总是转到localhost,后者被转发到容器。
但是,对于其他容器中的应用程序,它们需要指向Kafka容器,所以应该这样说 KAFKA_ADVERTISED_HOST_NAME: kafka ,在哪里 kafka 这是docker compose服务的名称。然后其他容器中的客户端将尝试连接到该容器
话说回来,这条线

consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')

您将python容器指向自身,而不是 kafka 容器。
应该说 kafka:9092 相反

htzpubme

htzpubme2#

在我的例子中,我想从本地(作为生产者)运行的外部python客户机访问kafka容器,下面是容器和python代码的组合,这些代码对我有用(平台mac os和docker版本2.4.0):
zookeeper容器:

docker run -d \
-p 2181:2181 \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:5.2.3

Kafka集装箱:

docker run -d \ 
-p 29092:29092 \ 
-p 9092:9092 \ 
--name=kafka \ 
-e KAFKA_ZOOKEEPER_CONNECT=host.docker.internal:2181 \ 
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ 
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,BROKER://localhost:9092 \ 
-e KAFKA_INTER_BROKER_LISTENER_NAME=BROKER \ 
-e KAFKA_BROKER_ID=1 \ 
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ 
-e KAFKA_CREATE_TOPICS="test:1:1" \ 
confluentinc/cp-enterprise-kafka:5.2.3

python客户端:

from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['localhost:29092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
security_protocol='PLAINTEXT')
acc_ini = 523416
print("Sending message")
producer.send('test', {'model_id': '1','acc':str(acc_ini), 'content':'test'})
producer.flush()
w6lpcovy

w6lpcovy3#

你的问题是网络。在你的Kafka配置中

KAFKA_ADVERTISED_HOST_NAME: localhost

但这意味着任何客户机(包括您的python应用程序)都将连接到代理,然后由代理通知使用 localhost 任何连接。由于来自客户机(例如python容器)的localhost不在代理所在的位置,请求将失败。
您可以在此处详细阅读有关Kafka听众的更多信息:https://rmoff.net/2018/08/02/kafka-listeners-explained/
因此,要解决您的问题,您可以做以下两件事之一:
只需将compose更改为使用kafka的内部主机名( KAFKA_ADVERTISED_HOST_NAME: kafka ). 这意味着docker网络中的任何客户端都可以很好地访问它,但是没有外部客户端可以(例如从您的主机):

version: '3'
services:
zookeeper:
    image: wurstmeister/zookeeper
    ports:
    - "2181:2181"
kafka:
    image: wurstmeister/kafka
    ports:
    - "9092:9092"
    environment:
    KAFKA_ADVERTISED_HOST_NAME: kafka
    KAFKA_CREATE_TOPICS: "test:1:1"
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
    - /var/run/docker.sock:/var/run/docker.sock
parse-engine:
    build: .
    depends_on:
    - "kafka"
    command: python parse-engine.py
    ports:
    - "5000:5000"

然后,您的客户将访问kafka:9092,因此您的python应用程序将更改为

consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')

为Kafka添加一个新的侦听器。这使得它可以从内部和外部访问docker网络。端口29092用于外部访问docker网络(例如从您的主机),端口9092用于内部访问。
您仍然需要更改python程序才能在正确的地址访问kafka。在本例中,由于它是docker网络的内部,因此您可以使用:

consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')

因为我不熟悉 wurstmeister 图片,这个docker合成是基于我所知道的融合图像:
(编辑把我的yaml弄坏了,你可以在这里找到)

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines: 
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 29092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup. 
    # If the latter is true, you will need to change the value 'localhost' in 
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those 
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:9092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

免责声明:我为confluent工作

相关问题