我正在使用kafka节点来使用来自特定kafka主题的消息。当我重新启动我的节点服务器时,它会像预期的那样初始化我的使用者,但是它的默认行为是从偏移量0开始使用,而我的目标是只接收新消息(也称为从当前偏移量开始使用)。我没有从api文档中找到实现这一点的方法。有人知道它是否得到支持吗?谢谢!
qmelpv7a1#
如果只想接收新消息,则必须在创建使用者示例之前设置以下属性:auto.offset.reset=latest
arknldoa2#
相似答案;这将检索每个分区的所有偏移量,并将偏移量设置为最大值减去1,以使用给定主题的最后发布的消息。
var offset = new kafka.Offset(client) offset.fetchLatestOffsets([topic], (err, offsets) => { if (err) { console.log(`error fetching latest offsets ${err}`) return } var latest = 1 Object.keys(offsets[topic]).forEach( o => { latest = offsets[topic][o] > latest ? offsets[topic][o] : latest }) consumer.setOffset(topic, 0, latest-1) })
mrphzbgm3#
我在kafka节点github问题(link)中问了这个问题并得到了答案。它现在可用(从v0.4.0)。以下代码片段对我很有用:
consumerClient = new kafka.Client('localhost:2181'); /* Print latest offset. */ var offset = new kafka.Offset(consumerClient); offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) { var latestOffset = data['myTopic']['0'][0]; console.log("Consumer current offset: " + latestOffset); }); var consumer = new kafka.HighLevelConsumer( consumerClient, [ { topic: 'myTopic', partition: 0, fromOffset: -1 } ], { autoCommit: false } );
干杯!
3条答案
按热度按时间qmelpv7a1#
如果只想接收新消息,则必须在创建使用者示例之前设置以下属性:auto.offset.reset=latest
arknldoa2#
相似答案;这将检索每个分区的所有偏移量,并将偏移量设置为最大值减去1,以使用给定主题的最后发布的消息。
mrphzbgm3#
我在kafka节点github问题(link)中问了这个问题并得到了答案。它现在可用(从v0.4.0)。以下代码片段对我很有用:
干杯!