快照机制可能会在apache flink中花费越来越多的内存吗

t8e9dugd  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(400)

我在学习flink的快照机制。
据我所知,jobmanager将以固定的时间间隔将屏障插入每个数据源,并且每个操作员将在从其所有数据源接收到第n个屏障后进行快照。
如果我是对的,这种机制在某些情况下可能会使用越来越多的记忆。
举个例子:
说有两个数据来源: Source 1 以及 Source 2 ,和一个运算符。

Source 1 -----\
               ------ Operator
Source 2 -----/
``` `Source 1` 正在生成整数流:1、2、3、4、5。。。 `Source 2` 正在生成字符流:a、b、c、d、e。。。
运算符执行此操作:它从 `Source 1` 一个输入来自 `Source 2` 要生成输出:1a2、3b4、5c6、7d8。。。
假设jobmanager将屏障插入到两个数据源中,如下所示:

1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...

现在让我们开始。
当两个“屏障”的 `Source 1` 以及 `Source 2` 进入操作符后,flink会为当前状态为的操作符创建一个快照 `1` 以及 `a` ,因为 `1` 以及 `a` 当障碍物a进入操作员时,已进入操作员。
然后,当两个“屏障b”进入操作符时,操作符就完成了它的第一个任务:生成 `1a2` ,flink将制作另一个快照: `NA` ,  `b` .  `NA` 表示当前没有来自 `Source 1` .
同时,每个快照都将存储到ram、fs或rocksdb中(取决于我们如何配置flink)。
如果我是对的,我认为flink会在这个例子中生成越来越多的快照。因为 `Source 1` 总是两倍于 `Source 2` .
我误解了什么吗?
eqoofvh9

eqoofvh91#

有趣的思维实验。
如果您将自己限制为只使用flinkapi的标准部分,那么就无法实现一个用户函数,该函数将为从源2读取的每个输入从源1读取两个输入。在实施 CoProcessFunction 例如,您受flink运行时的支配,它将根据自己的内部逻辑从任一流提供事件。这两个流将相互竞争,可能在不同的线程中运行,甚至在不同的进程中运行。当流聚合时,如果来自两个输入的事件没有按您希望的顺序提供,则必须在flink状态下缓冲它们,直到您准备好处理它们为止。
这可能导致大量缓冲需求的常见情况是在实现事件时间连接时,其中一个流在其时间戳方面遥遥领先于另一个流(例如,在外汇汇率上加入金融交易,使用交易时有效的汇率,如果汇率流滞后)。但是这种缓冲可以在rocksdb中完成,并且不必对内存施加压力。
请注意,这种状态缓冲完全发生在您的应用程序中——flink没有灵活的网络缓冲区,这些缓冲区在背压期间会膨胀。
另一点是快照从不存储在本地文件系统或rocksdb中。如果选择使用rocksdb state后端,则每个任务管理器的活动、工作状态将存储在本地rocksdb示例中,但状态备份(快照)将存储在分布式文件系统中。
至于你这样描述的情况,

1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...

这不会发生的。没有什么能安排这两个源以这种方式同步——它们将比这张图所显示的要独立得多。因为flink在管道级之间只有少量的固定数量的网络缓冲,所以执行图中出现的任何反压力都会迅速传播回一个或两个源。当这种情况发生时,被背压的源头将无法将任何事件推入管道,直到背压缓解——但与此同时,另一个源头可能会继续取得进展。屏障将由两个源在大致相同的时间独立地插入两个流中,但是如果源2经历频繁的背压(例如),它可能看起来更像这样:

1, BARRIER, A, 2, B, 3, BARRIER, C, 4, D, BARRIER, 5 ...
a, BARRIER, A, BARRIER, b, B, BARRIER, BARRIER, c ...

相关问题