我使用高级生产者和高级消费者来发送和接收消息。highlevelconsumer配置为autocommit=false,因为我只想在成功生成消息时提交消息。问题是,第一条信息从来没有真正得到承诺。
例子:
发送消息1-10。
接收消息1
接收消息2
提交消息2
...
接收消息10
提交消息10
提交消息1
如果我重新启动我的消费者,从1到10的所有消息都会被再次处理。只有当我向消费者发送新消息时,旧消息才会被提交。任何数量的消息都会发生这种情况。
我的代码如下:
var kafka = require('kafka-node'),
HighLevelConsumer = kafka.HighLevelConsumer,
client = new kafka.Client("localhost:2181/");
consumer = new HighLevelConsumer(
client,
[
{ topic: 'mytopic' }
],
{
groupId: 'my-group',
id: "my-consumer-1",
autoCommit: false
}
);
consumer.on('message', function (message) {
console.log("consume: " + message.offset);
consumer.commit(function (err, data) {
console.log("commited:" + message.offset);
});
console.log("consumed:" + message.offset);
});
process.on('SIGINT', function () {
consumer.close(true, function () {
process.exit();
});
});
process.on('exit', function () {
consumer.close(true, function () {
process.exit();
});
});
var messages = 10;
var kafka = require('kafka-node'),
HighLevelProducer = kafka.HighLevelProducer,
client = new kafka.Client("localhost:2181/");
var producer = new HighLevelProducer(client, { partitionerType: 2, requireAcks: 1 });
producer.on('error', function (err) { console.log(err) });
producer.on('ready', function () {
for (i = 0; i < messages; i++) {
payloads = [{ topic: 'mytopic', messages: "" }];
producer.send(payloads, function (err, data) {
err ? console.log(i + "err", err) : console.log(i + "data", data);
});
}
});
我是做错了什么还是Kafka的一个错误?
1条答案
按热度按时间5cg8jx4n1#
消息2的提交是消息1的隐式提交。
由于提交是异步完成的,并且消息1和消息2的提交是在彼此之后快速完成的(即,提交2发生在使用者发送提交1之前),因此第一次提交不会显式发生,只会发送消息2的一次提交。