生产者端的Kafka事务管理

hrysbysz  于 2023-05-28  发布在  Apache
关注(0)|答案(2)|浏览(192)

我正在寻找Kafka在事务中运行生产者时的行为。我有一个oracle数据库插入操作在同一个事务中运行,如果事务回滚,则回滚更改。Kafka生产者在事务回滚的情况下如何行为。
消息是否会回滚或者Kafka不支持回滚。
我知道JMS消息只有在事务提交时才提交到队列中。寻找类似的解决方案,如果它是支持的。
注意:Producer代码是使用Sping Boot 编写的。

oxcyiej7

oxcyiej71#

您正在尝试更新两个系统

  • 更新oracle数据库中的记录
  • 向Apache Kafka发送事件

这是一个挑战,因为你希望它是原子的,要么所有的东西都被执行,要么什么都不执行,否则你最终会得到你的数据库和Kafka之间的不一致。

  • 即使数据库事务被回滚,您也可以发送Kafka消息。
  • 或者反过来(如果你在提交之后发送消息),你可能会在发送Kafka事件之前提交数据库事务并崩溃(出于某种原因)。

最简单的解决方案之一是使用发件箱模式:

  • 假设您想要更新一个订单表并向Kafka发送orderEvent
  • 而不是在同一个事务中将事件发送给Kafka
  • 您可以使用与订单更新相同的事务将其保存为数据库表(发件箱)
  • 一个单独的进程将从发件箱表中读取数据,并确保它被发送到Kafka(使用至少一次语义)
  • 你的消费者必须是幂等的。

在这篇文章中,我将更详细地解释如何实现这个解决方案
https://mirakl.tech/sending-kafka-message-in-a-transactional-way-34d6d19bb7b2

d8tt03nd

d8tt03nd2#

RocketMQ提供了一种称为“事务消息”的机制,并包括一个称为“检查”的功能,用于处理发送到RocketMQ的消息的确认状态。
当生产者发送事务消息时,RocketMQ返回事务ID。执行本地事务时,生产者需要将此事务ID与事务状态(已提交或已回滚)沿着存储在其本地事务日志中。生产者需要周期性地通过向RocketMQ发送检查请求来检查事务的状态。
收到检查请求后,RocketMQ会调用生产者注册的相应检查监听器。检查侦听器然后可以检查本地事务状态,并使用事务的最终确认状态(提交或回滚)响应RocketMQ。基于检查响应,RocketMQ将采取适当的操作来提交或回滚事务。
RocketMQ中的检查机制通过允许生产者确认事务的最终状态来确保事务消息的可靠性和一致性。

相关问题