我正在使用nodejs来使用来自kafka的消息,在收到消息后,我将把它带到elasticsearch中创建索引。这是我的代码:
kafkaConsumer.on('message', function (message) {
elasticClient.index({
index: 'test',
type: 'sample',
body: message
}, function (error, response) {
if (error) {
// Stop consuming message here
console.log(error);
}
console.log(response);
});
});
我要确保在继续使用下一条消息之前必须成功创建索引,因为我不希望任何消息丢失。
2条答案
按热度按时间rsaldnfx1#
您可以暂停您的消费者,使其不会订阅和获取新消息。一旦你得到elasticclient的回复,你就可以继续使用Kafka了。
根据你的设置使用它。
polhcujo2#
试试
consumer.pause()/resume()
.pause()暂停消费者
resume()恢复消费者