显式减少kafka中的偏移量(用于重试/重新处理kafka消息)

qni6mghb  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(270)

我有一个kafka队列,自动提交设置为true。我正在寻找一个选项,以设置偏移到以前的位置,以防在处理当前消息时发生错误。比如:

var consumer = new HighLevelConsumer(client, topics, options);

consumer.on('message', function (message) {
        console.log('Got msg', message);
        var currentOffset = message.offset;

 /* 
//program logic which caused exception and we want to decrement the offset, so that the same message is picked again

* /

    if(errorOccured){
        handleErrorCondition(currentOffset);
    }

});

//this function sets the offset the current-1 position
function handleErrorCondition(currentOffset){

    offset.commit(groupId, [
        { topic: 'test-kafka', partition: 0, offset: (currentOffset-1) }
    ], function (err, data) {
        console.log('Attempted to commit the offset @ '+(currentOffset-1));
        var jsnData = JSON.stringify(data);
        console.log(err+' - '+jsnData);
    });
//consumer.setOffset('test-kafka', 0, (currentOffset -1));//this didnt work either
}

但是这段代码并没有将偏移量设置为“currentoffset-1”,它只是继续读取新消息。其基本意图是通过减小偏移量来重新处理失败的消息。我还尝试将“autocommit”设置为“false”,然后在成功执行代码时显式提交偏移量。在这种情况下,如果代码执行失败(因为偏移量没有增加),则会再次读取消息,但即使这样也不起作用。
我正在用kafkaï2.10(单节点)的nodejs中使用“kafka node”包。
你知道怎么做吗?或者其他解决同样问题的方法?
ps:我尝试了另一种方法,将消息放回队列中,它可以工作,但希望ofset方法可以工作。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题