我想编写一个函数,在读取主题中的最后一条消息后调用回调。
function getCurrentMessages(kafka, topic, cb_done){
// Start consuming from the beginning
var consumer = new kafka.Consumer(new kafka.Client(), [{topic: topic, offset: 0}], {fromOffset: true});
consumer.on('message', function(msg){
// Do something with msg
});
consumer.on('final-message-received', function(){
consumer.close(function(){
cb_done();
});
});
}
在当前的库中这可能吗?我不想让消费者打开来接收新消息。
2条答案
按热度按时间pjngdqdw1#
您可以尝试以下方法:
u7up0aaq2#
我想你可以在我的帮助下做这件事。用户端可以方便地获取事件偏移信息。
Kafka为您提供以下补偿:
1早期偏移
2电流偏移和
三。最新偏移量
您只需获取用kafka编写的最后一条消息的最新偏移量或偏移量,并在使用事件时将其与当前偏移量进行比较,这样当您使用来自kafka的最后一条消息时,您的最新偏移量将等于当前偏移量。
你可以写一个if条件来检查
if(当前\u偏移==最新\u偏移)
{
打破你想要的流动或行动;
}
请让我知道如果它不适合你或需要任何进一步的信息。