apache flink-作业部署期间的重复消息处理,使用activemq作为源

8ehkhllq  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(495)

鉴于,
我有一份工作 ActiveMQ source&写入mysql数据库-输入标识符。我已为此作业每秒钟启用一次检查点。我把检查站指向一个 Minio 例如,我验证了检查点是否与 jobid . 我部署这个作业是一个openshift(kubernetes在下面)-我可以根据需要放大/缩小这个作业。
问题
当作业部署(滚动)或作业由于错误/错误而停止时,如果activemq中有任何未使用的消息或flink中有任何未确认的消息(但已写入数据库),当作业恢复(或部署新作业)时,作业进程已处理的消息会导致在数据库中插入重复的记录。
问题
检查点不应该帮助工作从它离开的地方恢复吗?
在部署新作业之前,我应该先检查一下吗?
如果作业因错误或群集故障而退出,会发生什么情况?
作为 jobid 每次部署都会不断变化,恢复是如何进行的?
编辑,因为我不能期望从数据库的幂等性,以避免重复保存到数据库中( Exactly-Once ),是否可以编写特定于数据库的( upsert )查询以更新给定记录是否存在,如果不存在则插入?

9rygscc1

9rygscc11#

jdbc目前只支持至少一次,这意味着您在恢复时会收到重复的消息。目前有一个草案,以增加支持一次,这可能会与1.11发布。
检查点不应该帮助工作从它离开的地方恢复吗?
是的,但是最后一次成功的检查点和恢复之间的时间可能会产生观察到的重复项。我就一个有点相关的主题作了更详细的回答。
在部署新作业之前,我应该先检查一下吗?
当然。实际上,您应该将cancel与savepoint一起使用。这是改变拓扑结构的唯一可靠方法。另外,cancel with savepoints可以避免数据中的任何重复,因为它可以优雅地关闭作业。
如果作业因错误或群集故障而退出,会发生什么情况?
它应该自动重新启动(取决于您的重新启动设置)。它将使用最新的检查点进行恢复。这肯定会导致重复。
由于每次部署时jobid都在不断变化,恢复是如何进行的?
通常显式指向同一个检查点目录(在s3?)。
因为我不能期望数据库的幂等性,所以upsert是实现一次处理的唯一方法吗?
目前,我还没有找到解决的办法。应该改为1.11。

相关问题