我有一个简单的kafka节点consumergroup设置,如下所示-
'use strict';
const kafka = require('kafka-node');
const ConsumerGroup = kafka.ConsumerGroup;
const topic = 'topic-name';
const kafkaHost = 'broker:9092';
const consumerOptions = {
kafkaHost: kafkaHost,
groupId: 'kafka-node',
sslOptions: { rejectUnauthorized: false },
sasl:{ mechanism: 'plain', username: process.env.SASL_USERNAME, password: process.env.SASL_PASSWORD },
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest',
commitOffsetsOnFirstJoin: true,
};
console.log("Connecting to Kafka on :", kafkaHost);
const consumerGroup = new ConsumerGroup(consumerOptions, topic);
consumerGroup.on('message', function (message) { console.log('Message >>> \n', message)});
consumerGroup.on('error', (error) => { console.log('Error >>> \n', error)});
当我运行此代码时,它正确地订阅了在分区0上发布的消息,而没有读取发布到任何其他分区的消息。有什么问题吗?我遵循这里提到的例子
暂无答案!
目前还没有任何答案,快来回答吧!