祝大家节日快乐!
热释光;dr:我需要聚合存储在一个dynamodb表中的电影租赁信息,并将聚合的运行总数存储在另一个表中。如何确保一次聚合?
我当前将电影租赁信息存储在名为movierentals的dynamodb表中:{movie\u title,rent\u period\u in\u days,order\u date,rent\u amount}
我们每天都有数百万的电影出租。我们的web应用程序需要显示任何给定电影标题的合计租金金额。
我计划使用flink在movierental dynamodb流上按电影标题聚合租金金额,并将聚合租金金额存储在另一个名为rentalamountsbymovie的dynamodb表中:{movie\u title,total\u rental\u amount}
如何确保rentalamountsbymovie金额始终准确。i、 e.如何防止来自任何检查点的结果多次不更新rentalamountsbymovie表记录?
方法1:我将检查点id存储在rentalamountsbymovie表中,并进行条件更新以处理上述场景?
方法2:我可以实现使用dynamodb事务的twophasecommitsinkfunction。然而,根据flink文档,commit函数可以被多次调用,因此需要是幂等的。因此,即使这个解决方案也需要将检查点id存储在目标数据存储中。
方法3:另一种模式似乎只是将时间窗口聚合结果存储在rentalamountsbymovie表中:{movie\u title,rent\u amount\u for\u checkpoint,checkpoint\u id}。这样,从flink到dynamodb的写操作将是幂等的(flink没有做任何更新,它只做对目标ddb表的插入)。但是,webapp必须通过聚合rentalamountsbymovie表的结果来计算运行总数。我不喜欢这个解决方案,因为它对webapp的延迟影响。
方法4:也许我可以使用flink的可查询状态特性。不过,该功能似乎还在测试阶段:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html
我想这是一个非常常见的聚合用例。人们通常如何处理flink外部接收器中的聚合结果更新?
我很感激你的指点。如果需要,我们很乐意提供更多细节。
谢谢!
1条答案
按热度按时间gpfsuwkq1#
通常您关心的问题不是问题,因为人们正在使用幂等写来捕获外部接收器中的聚合结果。
你可以依靠flink,在flink的内部状态下,总是能获得rentalamountsbymovie的准确信息。之后,只需将这些信息镜像到dynamodb。
一般来说,如果你的汇是幂等的,那么事情就变得非常简单了。flink中保存的状态将由指向输入的某种指针(例如偏移量或时间戳)与消耗输入到该点所产生的聚合组成。你将需要引导状态;这可以通过处理所有历史数据来完成,或者通过使用状态处理器api来创建一个保存点来建立一个起始点。