我有一个kafka队列,自动提交设置为true。我正在寻找一个选项,以设置偏移到以前的位置,以防在处理当前消息时发生错误。比如:
var consumer = new HighLevelConsumer(client, topics, options);
consumer.on('message', function (message) {
console.log('Got msg', message);
var currentOffset = message.offset;
/*
//program logic which caused exception and we want to decrement the offset, so that the same message is picked again
* /
if(errorOccured){
handleErrorCondition(currentOffset);
}
});
//this function sets the offset the current-1 position
function handleErrorCondition(currentOffset){
offset.commit(groupId, [
{ topic: 'test-kafka', partition: 0, offset: (currentOffset-1) }
], function (err, data) {
console.log('Attempted to commit the offset @ '+(currentOffset-1));
var jsnData = JSON.stringify(data);
console.log(err+' - '+jsnData);
});
//consumer.setOffset('test-kafka', 0, (currentOffset -1));//this didnt work either
}
但是这段代码并没有将偏移量设置为“currentoffset-1”,它只是继续读取新消息。其基本意图是通过减小偏移量来重新处理失败的消息。我还尝试将“autocommit”设置为“false”,然后在成功执行代码时显式提交偏移量。在这种情况下,如果代码执行失败(因为偏移量没有增加),则会再次读取消息,但即使这样也不起作用。
我正在用kafkaï2.10(单节点)的nodejs中使用“kafka node”包。
你知道怎么做吗?或者其他解决同样问题的方法?
ps:我尝试了另一种方法,将消息放回队列中,它可以工作,但希望ofset方法可以工作。
暂无答案!
目前还没有任何答案,快来回答吧!