我们有一个微服务架构,使用kafka作为服务之间的通信机制。有些服务有自己的数据库。假设用户对服务a进行调用,这将导致在该服务的数据库中创建一条记录(或一组记录)。此外,该事件应作为Kafka主题的一个项目报告给其他服务机构。只有成功更新了kafka主题(基本上是围绕数据库更新和kafka更新创建分布式事务),才能确保写入数据库记录的最佳方法是什么?
我们正在考虑使用spring-kafka(在spring-boot-webflux服务中),我可以看到它有一个kafkatransactionmanager,但据我所知,这更多的是关于kafka事务本身(确保kafka生产者和消费者之间的一致性),而不是跨两个系统同步事务(请参见此处:“kafka不支持xa,您必须处理db tx可能在kafka tx回滚时提交的可能性。”。此外,我认为这个类依赖于spring的事务框架,至少就我目前所知,它是线程绑定的,如果使用React式方法(例如webflux),操作的不同部分可能在不同的线程上执行,那么它就不起作用(我们使用的是React式pg客户机,因此手动处理事务,而不是使用spring的框架。)
我能想到的一些选择:
不要将数据写入数据库:只将其写入Kafka。然后使用使用者(在服务a中)更新数据库。这似乎不是最有效的,并且会有问题,因为用户调用的服务不能立即看到它应该刚刚创建的数据库更改。
不要直接写Kafka:只写数据库,然后使用debezium之类的工具将更改报告给Kafka。这里的问题是,更改是基于单个数据库记录的,而要存储在kafka中的业务重要事件可能涉及来自多个表的数据的组合。
首先写入数据库(如果失败,什么也不做,只是抛出异常)。然后,在给Kafka写信时,假设写操作可能会失败。使用内置的自动重试功能,让它继续尝试一段时间。如果最终完全失败,请尝试写入死信队列,并为管理员创建某种手动机制来解决问题。如果写入dlq失败(即kafka完全关闭),只需以其他方式将其记录(例如,记录到数据库),然后再次创建某种手动机制供管理员进行排序。
有没有人对上述问题有什么想法或建议,或者有没有人能够纠正我上述假设中的任何错误?
提前谢谢!
3条答案
按热度按时间jxct1oxe1#
上面描述的所有方法都是解决问题的最佳方法,并且都是定义良好的模式。您可以在下面提供的链接中探索这些。
模式:事务发件箱
通过将事件或消息保存在数据库的发件箱中,将其作为数据库事务的一部分发布。http://microservices.io/patterns/data/transactional-outbox.html
模式:轮询发布者
通过轮询数据库中的发件箱来发布消息。http://microservices.io/patterns/data/polling-publisher.html
模式:事务日志跟踪
通过跟踪事务日志来发布对数据库所做的更改。http://microservices.io/patterns/data/transaction-log-tailing.html
piok6c0g2#
我建议使用方法2的一个稍微改变的变体。
只写入数据库,但除了实际的表写入外,还要将“事件”写入同一数据库中的特殊表中;这些事件记录将包含所需的聚合。最简单的方法是,只需插入另一个实体,例如由jpaMap的实体,该实体包含一个带有聚合负载的json属性。当然,这可以通过事务侦听器/框架组件的某种方式实现自动化。
然后使用debezium从表中捕获更改并将其流式传输到kafka中。这样,您就拥有了两种情况:kafka中的最终一致状态(kafka中的事件可能落后,或者重新启动后第二次您可能会看到一些事件,但最终它们将反映数据库状态),而不需要分布式事务,以及您所追求的业务级事件语义。
(免责声明:我是debezium的负责人;有趣的是,我正在写一篇博文,更详细地讨论这种方法)
这些是帖子
https://debezium.io/blog/2018/09/20/materializing-aggregate-views-with-hibernate-and-debezium/
https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/
7d7tgy0s3#
首先,我不得不说,我不是Kafka,也不是springMaven,但我认为在编写独立资源时,这更像是一个概念上的挑战,解决方案应该适合您的技术堆栈。此外,我应该说,这个解决方案试图在没有像debezium这样的外部组件的情况下解决这个问题,因为在我看来,每个额外的组件都会给测试、维护和运行应用程序带来挑战,而在选择这样的选项时,这些挑战常常被低估。也不是每个数据库都可以用作debezium源。
为了确保我们谈论的是相同的目标,让我们用一个简化的航空公司示例来说明情况,在这个示例中,客户可以购买机票。订单成功后,客户将收到由外部消息传递系统(我们必须与之交谈的系统)发送的消息(邮件、推送通知等)。
在传统的jms世界中,在我们的数据库(存储订单的地方)和jms提供者之间有一个xa事务,它看起来是这样的:客户机为我们启动事务的应用程序设置订单。应用程序将订单存储在其数据库中。然后消息被发送到jms,您可以提交事务。两个操作都参与事务,即使它们在与自己的资源进行通信。因为xa事务保证我们没事。
让我们把kafka(或者任何其他不能参与xa事务的资源)带到游戏中。由于没有协调器来同步这两个事务,下面的主要思想是用持久状态将处理分为两部分。
当您将订单存储在数据库中时,您还可以将消息(连同聚合数据)存储在您希望随后发送给kafka的同一数据库中(例如,作为clob列中的json)。同样的资源-酸保证,目前一切正常。现在,您需要一种机制来轮询“kafkatasks”——表中的新任务,这些任务应该发送到kafka主题(例如,使用计时器服务,可能可以在spring中使用@scheduled annotation)。消息成功发送到kafka后,您可以删除任务条目。这样可以确保只有在订单也成功地存储在应用程序数据库中时,才会向kafka发送消息。我们是否实现了与使用xa事务时相同的保证?不幸的是,没有,因为仍然有可能写信给Kafka工作,但删除任务失败。在这种情况下,重试机制(您需要一个问题中提到的机制)将重新处理任务并发送消息两次。如果您的业务案例对此“至少一次”感到满意,那么可以保证您在这里使用了imho半复杂的解决方案,该解决方案可以很容易地作为框架功能实现,因此不是每个人都需要关注细节。
如果需要“恰好一次”,则不能将状态存储在应用程序数据库中(在本例中,“删除任务”是“状态”),而是必须将其存储在kafka中(假设两个kafka主题之间有acid保证)。例如:假设表中有100个任务(ids1到100),任务作业处理前10个任务。你把你的Kafka信息写在他们的主题上,另一条id为10的信息写在“你的主题”上。都在同一个Kafka交易。在下一个循环中,您使用您的主题(值为10),并使用此值来获取下10个任务(并删除已处理的任务)。
如果有更简单(在应用中)的解决方案与相同的保证,我期待着听到你!
很抱歉回答得太长了,但我希望能有所帮助。