kafka节点从上次偏移开始消耗

zu0ti5jz  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(320)

我正在使用kafka节点来使用来自特定kafka主题的消息。当我重新启动我的节点服务器时,它会像预期的那样初始化我的使用者,但是它的默认行为是从偏移量0开始使用,而我的目标是只接收新消息(也称为从当前偏移量开始使用)。我没有从api文档中找到实现这一点的方法。有人知道它是否得到支持吗?
谢谢!

qmelpv7a

qmelpv7a1#

如果只想接收新消息,则必须在创建使用者示例之前设置以下属性:auto.offset.reset=latest

arknldoa

arknldoa2#

相似答案;这将检索每个分区的所有偏移量,并将偏移量设置为最大值减去1,以使用给定主题的最后发布的消息。

var offset = new kafka.Offset(client)
offset.fetchLatestOffsets([topic], (err, offsets) => {
    if (err) {
        console.log(`error fetching latest offsets ${err}`)
        return
    }
    var latest = 1
    Object.keys(offsets[topic]).forEach( o => {
        latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
    })
    consumer.setOffset(topic, 0, latest-1)
})
mrphzbgm

mrphzbgm3#

我在kafka节点github问题(link)中问了这个问题并得到了答案。它现在可用(从v0.4.0)。以下代码片段对我很有用:

consumerClient = new kafka.Client('localhost:2181');

/* Print latest offset. */
var offset = new kafka.Offset(consumerClient);

offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) {
        var latestOffset = data['myTopic']['0'][0];
        console.log("Consumer current offset: " + latestOffset);
});

var consumer = new kafka.HighLevelConsumer(
        consumerClient,
        [
            { topic: 'myTopic', partition: 0, fromOffset: -1 }
        ],
        {
            autoCommit: false
        }
);

干杯!

相关问题