ApacheFlink—操作符示例的事件时间是否可能变小?

3lxsmp7m  于 2021-06-21  发布在  Flink
关注(0)|答案(3)|浏览(213)

从以下url中的“并行流中的水印”部分,我们知道“操作符的当前事件时间是其输入流的事件时间的最小值”https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/event_time.html
现在我们以window(1)示例的事件时间为例,我们知道事件时间是14(min(29,14)),但是如果下面的序列水印事件发生了呢?
如果水印事件29在水印事件14之前到达窗口(1),会发生什么情况?
例如假设水印事件29首先到达窗口(1)示例,由于水印14事件还没有到达,所以首先将窗口(1)示例的事件时间设置为29,然后假设水印14事件也到达窗口(1)示例,那么窗口(1)示例的事件时间设置为14(如果是这样,那么窗口(1)的事件时间将从29变为14,变小),也假设在源(2)生成水印39,然后到达窗口(1)示例之后,窗口(1)示例的事件时间将设置为29还是39?

wwwo4jvm

wwwo4jvm1#

简单的回答是不,在这种情况下窗口不会变小(实际上可能会抛出异常)。
这就是BoundedAutoForderness水印提取器发挥作用的地方。使用它,您可以配置时间戳的“无序”程度,它将消除这些差异。默认情况下,使用ascendingtimestamp提取器,接收顺序错误的时间戳实际上是一个错误。
此外,还有“允许延迟”的概念,它定义了在接收到低于当前水印的时间戳的情况下会发生什么。
例如,如果您知道数据源可能有60秒的抖动(由于处理时间延迟、地理距离等),则可以使用值为的有界无序提取器 (TimeUnit.SECONDS, 60) 这将有效地把你的整个Windows移到60年代。这将允许元素在60秒内以任何顺序出现。
但是,如果您实际希望元素完全按顺序或以非常小的抖动进入,但希望接受延迟的元素进行处理,则可以使用允许的延迟设置来定义这些元素进入时流程的行为。默认情况下,flink只是简单地删除它们,但是您可以配置一个时间段,flink将为传入的每个元素重新启动窗口。
从根本上说,所有这些都取决于您的特定情况,以及您期望数据输入的紧张程度和延迟程度,以及您如何处理延迟的元素。flink几乎允许任何设置的组合。

lg40wkob

lg40wkob2#

最后,我也从源代码处得到了答案,正如david所说的“窗口的水印将保持long.min\u值,直到来自两个输入流的较大值到达为止。”
https://github.com/apache/flink/blob/57b950796deebed46ae95f97152e09b2e2655de8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/abstractstreamoperator.java

public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
  combinedWatermark = newMin;
  processWatermark(new Watermark(combinedWatermark));
}
}

public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
  combinedWatermark = newMin;
  processWatermark(new Watermark(combinedWatermark));
}
}
syqv5f0l

syqv5f0l3#

Such as suppose the watermark event 29 arrives at the window(1) instance 
firstly, as the watermark 14 event hasn't arrived it, so the event time of 
window(1) instance was set to 29 firstly ...

这是不对的。在第一个正确的水印到达之前,将使用long.min\u值的占位符值。因此,窗口的水印将保持long.min\u值,直到两个输入流都有一个较大的值。

相关问题