我创建了一个连接器,将集合的所有插入/更新事件推送到同一主题(1个分区)。处理insert事件消息的使用者代码将比update事件花费一些时间。
这里的问题是消息不是基于消费顺序提交的。以下是Kafka消费者的步骤和行为。
在集合中插入一条记录,并在一秒钟内更新字段。
该主题现在有2条记录。一个用于插入事件(消息1),另一个用于更新事件(消息2)。
消息1已使用
消息2已使用
消息2已提交
消息1已提交
有什么方法可以让我等到消息1被提交,然后使用消息2。
消费者代码:
const startConsumer = async () => {
// Creating a kafka consumer group.
const kafkaConsumerGroup = new kafkaNode.ConsumerGroup(consumer_configs, topic_name);
kafkaConsumerGroup.on("connect", () => {
console.log("Consumer Group Connected Successfully");
});
// Listening for messages from kafka.
kafkaConsumerGroup.on("message", async (message) => {
//processor code
const data = JSON.parse(message.value);
response = await processData(JSON.parse(data.payload));
//Commit
kafkaConsumerGroup.commit((error, data) => {
if (error) {
console.log(error);
} else {
console.log('Commit success ');
}
});
});
kafkaConsumerGroup.on("error", (error) => {
console.log(error);
});
};
暂无答案!
目前还没有任何答案,快来回答吧!