npm kafka节点:在最终消息读取后关闭使用者

yftpprvb  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(281)

我想编写一个函数,在读取主题中的最后一条消息后调用回调。

function getCurrentMessages(kafka, topic, cb_done){
  // Start consuming from the beginning
  var consumer = new kafka.Consumer(new kafka.Client(), [{topic: topic, offset: 0}], {fromOffset: true});
  consumer.on('message', function(msg){
    // Do something with msg
  });
  consumer.on('final-message-received', function(){
    consumer.close(function(){
      cb_done();
    });
  });
}

在当前的库中这可能吗?我不想让消费者打开来接收新消息。

pjngdqdw

pjngdqdw1#

您可以尝试以下方法:

consumer.on('message', function (message) {
        console.log("received message", message);
        if(message.offset == (message.highWaterOffset -1)){
          consumer.close(true, function(err, message) {
            console.log("consumer has been closed..");
          });
        }
 });
u7up0aaq

u7up0aaq2#

我想你可以在我的帮助下做这件事。用户端可以方便地获取事件偏移信息。
Kafka为您提供以下补偿:
1早期偏移
2电流偏移和
三。最新偏移量
您只需获取用kafka编写的最后一条消息的最新偏移量或偏移量,并在使用事件时将其与当前偏移量进行比较,这样当您使用来自kafka的最后一条消息时,您的最新偏移量将等于当前偏移量。
你可以写一个if条件来检查
if(当前\u偏移==最新\u偏移)
{
打破你想要的流动或行动;
}
请让我知道如果它不适合你或需要任何进一步的信息。

相关问题