如何在kafka nodejs中延迟消耗消息?

rjzwgtxy  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(691)

我正在使用nodejs来使用来自kafka的消息,在收到消息后,我将把它带到elasticsearch中创建索引。这是我的代码:

kafkaConsumer.on('message', function (message) {
    elasticClient.index({
        index: 'test',
        type: 'sample',
        body: message
    }, function (error, response) {
        if (error) {

            // Stop consuming message here

            console.log(error);
        }
        console.log(response);
    });
});

我要确保在继续使用下一条消息之前必须成功创建索引,因为我不希望任何消息丢失。

rsaldnfx

rsaldnfx1#

您可以暂停您的消费者,使其不会订阅和获取新消息。一旦你得到elasticclient的回复,你就可以继续使用Kafka了。

kafkaConsumer.pause();
kafkaConsumer.resume();

根据你的设置使用它。

polhcujo

polhcujo2#

试试 consumer.pause()/resume() .
pause()暂停消费者
resume()恢复消费者

相关问题