java 无法使用docker-file连接docker中的Kafka

kyvafyod  于 2023-08-02  发布在  Java
关注(0)|答案(3)|浏览(173)
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - ./.docker-volumes/zookeeper/_data:/var/lib/zookeeper/data
      - ./.docker-volumes/zookeeper/_log:/var/lib/zookeeper/log

  kafka:
    image: confluentinc/cp-kafka:5.5.1
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./.docker-volumes/kafka/_data:/var/lib/kafka/data

字符串
上面是docker-compose文件,用于在docker中启动zookeeper和Kafka示例。
当我运行一个试图与Kafka建立连接的服务时。我得到了低于误差。我在互联网上找到了它,但没有得到太多。

服务层出错

.12:43:18.730 [kafka-producer-network-thread | producer-2] WARN  o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-2] Connection to node -1 (localhost/127.0.0.1:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. 
.12:43:19.295 [kafka-producer-network-thread | producer-1] WARN  o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. 
.12:43:19.481 [kafka-admin-client-thread | vendor_srs_consumer_srs_restaurant_finance_pan_validate_client_id_0] WARN  o.apache.kafka.clients.NetworkClient - [AdminClient clientId=vendor_srs_consumer_srs_restaurant_finance_pan_validate_client_id_0] Connection to node -1 (localhost/127.0.0.1:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.

kafka-producer尝试连接kafka brothers时Docker出错。

[2021-07-12 07:13:19,492] WARN [SocketServer brokerId=1] Unexpected error from /172.20.0.1; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296129 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:105)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at kafka.network.Processor.poll(SocketServer.scala:893)
at kafka.network.Processor.run(SocketServer.scala:792)
at java.lang.Thread.run(Thread.java:748)


不知道为什么生产者无法与经纪人连接。先谢谢你。

vaj7vani

vaj7vani1#

PLAINTEXT_HOST://0.0.0.0:9092不是有效的通告侦听器;这需要是可解析的地址。
在您的错误中,Connection to node -1 (localhost/127.0.0.1:9092) terminated,不清楚此代码在哪里运行,但如果在Docker之外的同一主机上,则应通告localhost,而不是0.0.0.0
您的另一个错误InvalidReceiveException: Invalid receive (size = 369296129 larger than 104857600)表明您连接到了其他服务器,并得到了客户端不期望的响应。

7d7tgy0s

7d7tgy0s2#

最初,它们在默认的网桥网络上,为了在容器之间进行通信,您必须使用容器IP。
您可以尝试在同一网络上部署两个容器。
在docker-compose中添加网络定义

networks:
  default:
    external:
      name: zookeeper-kafka
  zookeeper-kafka:
    driver: bridge

字符串
在服务定义中,您可以添加网络名称和端口。
例如,对于zookeeper服务

zookeeper:
    networks:
    - zookeeper-kafka
    image: confluentinc/cp-zookeeper:5.5.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - ./.docker-volumes/zookeeper/_data:/var/lib/zookeeper/data
      - ./.docker-volumes/zookeeper/_log:/var/lib/zookeeper/log


然后你可以通过zookeeper-kafka:2181(即zookeeper-kafka)访问zookeeper服务。network-name:port),同时通过Kafka进行连接。
更新Kafka服务定义示例

kafka:
    networks:
    - zookeeper-kafka
    image: confluentinc/cp-kafka:5.5.1
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-kafka:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://zookeeper-kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./.docker-volumes/kafka/_data:/var/lib/kafka/data


希望这对你有帮助

6pp0gazn

6pp0gazn3#

我也遇到了同样的问题,我们使用Kafka容器来运行处理生产者或消费者的功能测试。
下面是来自docker-compose.yml的片段,其中提供了zookeeper、Kafka和schema注册表的容器

zookeeper:
    image: confluentinc/cp-zookeeper:6.1.0
    hostname: zookeeper
    container_name: zookeeper
    healthcheck:
      test: nc -z zookeeper 2181
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      # Default 8080 port clashes with existing services
      KAFKA_OPTS: '-Dzookeeper.admin.serverPort=3181'

kafka:
    image: confluentinc/cp-kafka:6.1.0
    hostname: kafka
    container_name: kafka
    healthcheck:
      test: nc -z kafka 29092
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "29092:29092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1

schema-registry:
    image: confluentinc/cp-schema-registry:6.1.0
    hostname: schema-registry
    container_name: schema-registry
    healthcheck:
      test: nc -z schema-registry 8081
    restart: always
    depends_on:
      kafka:
        condition: service_healthy
    ports:
      - '8081:8081'
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

字符串
现在你可以在Kafka容器中看到

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT


容器暴露的安全协议是PLAINTEXT。
但是在我的例子中,问题出在application.yaml文件中。下面是application. yaml中的Kafka生产者配置。现在作为其提到的安全协议是SASL_SSL

kafka:
        producer:
          bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER:}
          key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
          value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        properties:
          schema.registry.url: ${KAFKA_SCHEMA_REGISTRY_URL:}
          sasl:
            jaas:
              config: org.apache.kafka.common.security.plain.PlainLoginModule required username='${KAFKA_CLUSTER_API_KEY:}' password='${KAFKA_CLUSTER_SECRET:}';
            mechanism: PLAIN
          basic:
            auth:
              user:
                info: ${KAFKA_SCHEMA_REGISTRY_API_KEY:}:${KAFKA_SCHEMA_REGISTRY_SECRET:}
              credentials:
                source: USER_INFO
          security:
            protocol: SASL_SSL


因此,为了运行我的功能测试,我必须将此安全协议覆盖到PLAINTEXT。因为这只需要进行功能测试,并且在生产配置中,SASL_SSL是正确的。
我们可以通过设置一个名为SPRING_KAFKA_PROPERTIES_SECURITY_PROTOCOL的环境变量来覆盖它,这相当于(Spring隐式地解决了这个问题)

spring:
    kafka:
       properties:
          security:
              protocol: PLAINTEXT


因此,在功能测试模式下,应用程序能够通过覆盖从SASL_SSLPLAINTEXT的安全协议连接到Kafka容器

相关问题