我正在使用kafka流和数据库开发一个应用程序。
在我的应用程序中,我手动管理kafka偏移量,并仅在成功处理消息时提交偏移量(即在成功处理并更新到db之后)。
但是,如果在更新db之后,我的应用程序在提交之前停止,那么当它返回时,由于未提交的偏移量,会导致对db的重复写入。
我想避免这些重复,同时仍然确保我正在处理每一条消息。正确的方法是什么?
编辑:我对db的更新基本上是将一个记录的计数器增加一个值。所以合并语句不是一个选项。
我正在使用kafka流和数据库开发一个应用程序。
在我的应用程序中,我手动管理kafka偏移量,并仅在成功处理消息时提交偏移量(即在成功处理并更新到db之后)。
但是,如果在更新db之后,我的应用程序在提交之前停止,那么当它返回时,由于未提交的偏移量,会导致对db的重复写入。
我想避免这些重复,同时仍然确保我正在处理每一条消息。正确的方法是什么?
编辑:我对db的更新基本上是将一个记录的计数器增加一个值。所以合并语句不是一个选项。
1条答案
按热度按时间pvcm50d11#
这有点棘手。
Kafka只支持一次语义。但是,当您将数据写入外部数据存储时,您需要确保用户端的数据只有一次。
实现这一点的一种方法(正如jaykreps在这里提出的那样)是将数据存储中的kafka偏移量作为单个事务的一部分进行维护。因此,如果保持每个分区的最后一个偏移量,则当接收到的偏移量小于数据库中存储的偏移量时,始终可以忽略来自给定分区的消息。
然而,这种方法有一个警告。如果您有一个多数据中心主动-主动部署,如果主集群崩溃,使用者将回退到不同的数据中心集群,则不能盲目地依赖偏移量。偏移量是一个物理标识,一个集群中消息的偏移量可以不同于另一个集群中复制消息的偏移量。
在这种情况下,我认为正确的方法是利用kafka流并在kafka表中维护计数(
KTable
)存储在压缩的Kafka主题中。kafka内部将使用producer id、epoch、transaction id等来确保语义的精确性。