kakfa使用者正在处理下一条消息,甚至在提交同一主题的第一条消息之前

r8uurelv  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(316)

我创建了一个连接器,将集合的所有插入/更新事件推送到同一主题(1个分区)。处理insert事件消息的使用者代码将比update事件花费一些时间。
这里的问题是消息不是基于消费顺序提交的。以下是Kafka消费者的步骤和行为。
在集合中插入一条记录,并在一秒钟内更新字段。
该主题现在有2条记录。一个用于插入事件(消息1),另一个用于更新事件(消息2)。
消息1已使用
消息2已使用
消息2已提交
消息1已提交
有什么方法可以让我等到消息1被提交,然后使用消息2。
消费者代码:

  1. const startConsumer = async () => {
  2. // Creating a kafka consumer group.
  3. const kafkaConsumerGroup = new kafkaNode.ConsumerGroup(consumer_configs, topic_name);
  4. kafkaConsumerGroup.on("connect", () => {
  5. console.log("Consumer Group Connected Successfully");
  6. });
  7. // Listening for messages from kafka.
  8. kafkaConsumerGroup.on("message", async (message) => {
  9. //processor code
  10. const data = JSON.parse(message.value);
  11. response = await processData(JSON.parse(data.payload));
  12. //Commit
  13. kafkaConsumerGroup.commit((error, data) => {
  14. if (error) {
  15. console.log(error);
  16. } else {
  17. console.log('Commit success ');
  18. }
  19. });
  20. });
  21. kafkaConsumerGroup.on("error", (error) => {
  22. console.log(error);
  23. });
  24. };

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题