脚本:
im设置自动提交为假,
生成12条消息
吃掉他们(从偏移量100开始)
关闭耗电元件
创建新的消费者
在那个阶段,我希望第二个消费者从偏移量100开始再次读取所有消息(因为没有提交)
但是当生成新消息时,我看到第二个消费者从新的偏移量(113)开始,即提交仍然以某种方式发生。。
我做错什么了?
这是我的消费代码
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['192.168.14.10:9095']
});
const admin = kafka.admin();
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => {
// admin
await admin.connect();
// Consuming
await consumer.connect();
await consumer.subscribe({ topic: 'topic-test2'});
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString()
});
}
}
});
};
run().catch(console.error);
暂无答案!
目前还没有任何答案,快来回答吧!