问题定义与概念建立
假设我们有一个5分钟大小的tumblingeventtimewindow。我们有包含两个基本信息的事件:
数
事件时间戳
在这个例子中,我们在12:00 pm工作者机器的挂钟时间启动flink拓扑(当然工作者可以有不同步的时钟,但这超出了这个问题的范围)。此拓扑包含一个处理运算符,其职责是汇总属于每个窗口的事件值,以及一个与此问题无关的kafka接收器。
此窗口具有BoundedAutoFordernessTimestampExtractor,允许延迟一分钟。
水印:据我所知,flink和spark结构化流中的水印定义为(到目前为止看到的最大事件时间戳-允许延迟)。任何事件时间戳小于或等于此水印的事件都将在结果计算中被丢弃和忽略。
第1部分(确定窗口边界)
快乐(实时)路径
在这个场景中,几个事件到达flink操作符,并具有不同的事件时间戳 12:01 - 12:09
. 此外,事件时间戳与我们的处理时间相对一致(如下面的x轴所示)。因为我们处理的是事件的时间特性,所以偶数是否属于某个特定的事件应该通过它的事件时间戳来确定。
在此处输入图像描述
旧数据涌入
在这个流程中,我假设两个滚动窗口的边界是 12:00 -- 12:05
以及 12:05 -- 12:10
就因为我们已经在12点开始执行拓扑了。如果这个假设是正确的(我希望不是),那么在一些旧事件以更旧的事件时间戳出现并且我们在12:00再次启动拓扑的情况下会发生什么呢(年龄大到我们的迟到津贴不包括他们)。如下所示:
在此处输入图像描述
如果是这样的话,那么我们的事件将不会在任何窗口中被捕获,当然,所以,我希望这不是行为:)
另一个选择是通过到达事件的事件时间戳来确定windows的边界。如果是这样,那怎么办?注意到的最小事件时间戳成为第一个窗口的开始,然后根据大小(在本例中为5分钟)确定随后的边界?因为这种方法也会有缺陷和漏洞。你能解释一下这是如何工作的,以及Windows的边界是如何确定的吗?
事件纷至沓来
对上一个问题的回答也将解决这个问题,但我认为在这里明确地提到这一点是有帮助的。假设我有一个5分钟大小的翻滚窗。然后在12:00,我开始了一项回填工作,这项工作在很多情况下都会被送到flink操作员那里,他的时间戳覆盖了整个范围 10:02 - 10:59
; 但由于这是一个回填作业,整个执行过程大约需要3分钟才能完成。
作业是否会分配12个单独的窗口,并根据事件的事件时间戳正确填充它们?那12扇Windows的边界是什么?最后会有12个输出事件,每个事件都有每个分配窗口的总和吗?
第2部分(此类有状态运算符的单元/集成测试)
我还对此类逻辑和运算符的自动测试有一些担心。操纵处理时间的最佳方法是,触发某些行为,从而形成所需窗口的边界,以便进行测试。特别是我读到的关于杠杆的东西 Test Harnesses
似乎有点混乱,可能会导致一些不太容易阅读的混乱代码:
单元测试有状态运算符
flink中窗口的延迟测试
参考文献
我在这一领域学到的大部分知识和一些困惑的根源可以在以下地方找到:
timestmap提取器和水印发射器
事件时间处理与水印
spark中后期数据处理与水印技术
spark doc那部分的图片非常有帮助和教育意义。但与此同时,windows的边界与那些处理时间而不是事件时间戳对齐的方式,给我带来了一些困惑。
而且,在这种可视化中,水印似乎每5分钟计算一次,因为这是窗口的滑动规范。这是计算水印频率的决定性因素吗?对于不同的窗口(例如。 Tumbling
, Sliding
, Session
更多)?!
非常感谢您的帮助,如果您知道任何更好的参考,关于这些概念和他们的内部工作,请让我知道。
以下@snntrable answer后更新
如果使用事件时间语义运行作业,则窗口操作符的处理时间完全不相关
这是正确的,我理解这一部分。一旦你在处理 EVENT_TIME
在语义/逻辑上,你几乎脱离了处理时间。我之所以提到处理时间,是因为我对以下关键问题感到困惑,这对我来说仍然是个谜:
Windows的边界是怎么计算的?!
另外,非常感谢您澄清 out-of-orderness
以及 lateness
. 我正在处理的代码由于使用了一个错误的名称(继承自 BoundedOutOfOrdernessTimestampExtractor
被命名为 maxLatency
) :/
考虑到这一点,让我看看我是否能够正确地计算水印以及何时丢弃事件(或侧输出):
无序赋值器
当前水印= max-event-time-seen-so-far - max-out-of-orderness-allowed
允许迟到
当前水印= max-event-time-seen-so-far - allowed-lateness
规则流
当前水印= max-event-time-seen-so-far
在这些情况下,无论事件时间戳小于或等于 current-watermark
,将被丢弃(侧输出),对吗?!
这就提出了一个新的问题。你想什么时候用 out of orderness
相对于 lateness
? 因为在这些情况下,当前的水印计算(数学上)可以是相同的。当你同时使用这两种方法时会发生什么(这有意义吗)?!
回到windows的边界
对我来说,这仍然是主要的谜团。考虑到上面的所有讨论,让我们重温一下我提供的具体示例,看看这里如何确定窗口的边界。假设我们有以下场景(事件的形状为 (value, timestamp)
):
操作员在中午12:00开始工作(这是处理时间)
事件按以下顺序到达操作员
(1, 8:29)
(5, 8:26)
(3, 9:48)
(7, 9:46)
我们有一个5分钟大小的折叠窗
窗口应用于 DataStream
与 BoundedOutOfOrdernessTimestampExtractor
有2分钟 maxOutOfOrderness
此外,窗口配置为 allowedLateness
1分钟
注意:如果你不能两者兼得 out of orderness
以及 lateness
或没有意义,请只考虑 out of orderness
在上面的例子中。
最后,你能不能布置一些分配了事件的窗口,请指定这些窗口的边界(窗口的开始和结束时间戳)。我假设边界也是由事件的时间戳决定的,但是要在像这样的具体例子中弄清楚它们有点棘手。
再次感谢您的帮助:)
1条答案
按热度按时间mctunoxg1#
原始答案
水印:据我所知,flink和spark结构化流中的水印定义为(到目前为止看到的最大事件时间戳-允许延迟)。任何事件时间戳小于或等于此水印的事件都将在结果计算中被丢弃和忽略。
这是不正确的,可能是造成混淆的原因。失序和迟到在《Flink》中是两个不同的概念。与
BoundedOutOfOrdernessTimestampExtractor
水印是max-event-timestamp-seen-so-far - max-out-of-orderness
. 关于flink文档中允许的延迟的更多信息[1]。如果使用事件时间语义运行作业,则窗口操作符处的处理时间完全不相关:
事件将根据其事件时间戳分配给其窗口
一旦水印达到其最大时间戳,就会触发时间窗口(
window end time -1
).时间戳早于的事件
current watermark - allowed lateness
被丢弃或发送到最新的数据端输出[1]这意味着,如果您在12:00pm(处理时间)开始一个作业并开始接收过去的数据,则水印也将(甚至更远)在过去。所以,这个
allowedLateness
是不相关的,因为数据不是关于偶数时间的延迟。另一方面,如果您首先从12:00pm摄取一些数据,然后从10:00pm摄取一些数据,则在摄取旧数据之前,水印将已经提前到12:00pm。在这种情况下,从晚上10:00开始的数据将是“迟到的”。如果它晚于配置的
allowedLateness
(默认值=0)丢弃(默认值)或发送到侧输出(如果配置)[1]。后续答案
事件时间窗口的时间线如下所示:
创建窗口到达->状态中第一个带有时间戳的元素(&key)
watermark >= window_endtime - 1
到达->窗口被触发(结果被发出),但状态不会被丢弃watermark >= window_endtime + allowed_latenes
到达->状态被丢弃在2之间。和3。此窗口的事件延迟,但在允许的延迟范围内。这些事件被添加到现有状态,并且(默认情况下)在每个记录上触发窗口,从而发出一个优化的结果。
三点以后。此窗口的事件将被丢弃(或发送到延迟输出接收器)。
所以,是的,两者都配置是有意义的。无序性决定了何时第一次触发窗口,而允许的延迟性则决定了状态保持多长时间以可能更新结果。
关于边界:翻滚事件时间窗口有一个固定的长度,跨键和开始对齐