带有nestjs错误的Kafka:KafkaJSProtocolError:此服务器没有主持此主题分区

luaexgnf  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(182)

我有一个使用Docker compose运行的Kafka集群:

broker1:
    image: confluentinc/cp-kafka:7.4.0
    hostname: broker1
    container_name: broker1
    depends_on:
      - controller
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      

  controller:
    image: confluentinc/cp-kafka:7.4.0
    hostname: controller
    container_name: controller
    ports:
      - "9093:9093"
      - "9102:9102"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9102
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
      KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

请注意,KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'(表示Kafka)被配置为自动创建主题。
我的客户端配置:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092']
        }
      },
      consumer: {
        groupId: 'my-consumer-group',
        allowAutoTopicCreation: true
      }
    }
)

对于消费特定主题,我使用@EventPattern装饰器。
我期望应用程序将创建我的消费者正在消费的主题,但我得到了这个错误:

ERROR [Connection] Response Meadata(key: 3, version: 6) {"timestamp": "2023-08-09T10:20:17.462Z", "logger": "kafkajs", "broker": "localhost:9092", "clientId": "nestjs-consumer-server", "error": "This server does noy host this topic-partition", "correlationId": 7, "size": 371"}

KafkaJSProtocolError: This server does not host this topic-partition
....

需要说明的一点是,相同的代码和配置适用于不同的Kafka堆栈(我只是没有关于最后一个堆栈的特定图像和版本的所有信息)
我不明白问题是出在Kafka的配置上,还是Kafka不允许消费者自动创建主题,还是出在nest上?
编辑:
我发现应用程序每次创建两个主题,然后失败,所以我认为这意味着Kafka被正确配置为自动主题创建,问题出在nest application或kafkajs中。

2w3kk1z5

2w3kk1z51#

这不是一个完美的解决方案,但最终对我起作用的是使用Kafka和zookeeper而不是Kraft,这解决了错误,并使消费者能够自动创建所有主题。但更好的解决方案可能是在消费者尝试消费它们之前创建主题。

相关问题