我有一个使用Docker compose运行的Kafka集群:
broker1:
image: confluentinc/cp-kafka:7.4.0
hostname: broker1
container_name: broker1
depends_on:
- controller
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker'
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
KAFKA_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
controller:
image: confluentinc/cp-kafka:7.4.0
hostname: controller
container_name: controller
ports:
- "9093:9093"
- "9102:9102"
environment:
KAFKA_NODE_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9102
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
请注意,KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
(表示Kafka)被配置为自动创建主题。
我的客户端配置:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092']
}
},
consumer: {
groupId: 'my-consumer-group',
allowAutoTopicCreation: true
}
}
)
对于消费特定主题,我使用@EventPattern
装饰器。
我期望应用程序将创建我的消费者正在消费的主题,但我得到了这个错误:
ERROR [Connection] Response Meadata(key: 3, version: 6) {"timestamp": "2023-08-09T10:20:17.462Z", "logger": "kafkajs", "broker": "localhost:9092", "clientId": "nestjs-consumer-server", "error": "This server does noy host this topic-partition", "correlationId": 7, "size": 371"}
KafkaJSProtocolError: This server does not host this topic-partition
....
需要说明的一点是,相同的代码和配置适用于不同的Kafka堆栈(我只是没有关于最后一个堆栈的特定图像和版本的所有信息)
我不明白问题是出在Kafka的配置上,还是Kafka不允许消费者自动创建主题,还是出在nest上?
编辑:
我发现应用程序每次创建两个主题,然后失败,所以我认为这意味着Kafka被正确配置为自动主题创建,问题出在nest application或kafkajs中。
1条答案
按热度按时间2w3kk1z51#
这不是一个完美的解决方案,但最终对我起作用的是使用Kafka和zookeeper而不是Kraft,这解决了错误,并使消费者能够自动创建所有主题。但更好的解决方案可能是在消费者尝试消费它们之前创建主题。