flink流处理窗口中的后期数据采集

ukxgm1gy  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(539)

假设我有一个包含事件时间数据的数据流。我想在8毫秒的窗口时间内收集输入数据流,并减少每个窗口的数据。我使用以下代码:

aggregatedTuple
          .keyBy( 0).timeWindow(Time.milliseconds(8))
          .reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()

要点:数据流的关键是处理时间的时间戳,例如Map到处理毫秒时间戳的最后8个子倍数 1531569851297 将Map到 1531569851296 .
但有可能数据流到达晚了,进入了错误的窗口时间。例如,假设我将窗口时间设置为8毫秒。如果数据按顺序进入flink引擎,或者至少延迟小于窗口时间(8毫秒),这将是最好的情况。但是假设数据流事件时间(也就是数据流中的一个字段)已经到达,延迟为30毫秒。所以它会进入错误的窗口,我想如果我检查每个数据流的事件时间,因为它想进入窗口,我可以在这么晚的数据过滤。所以我有两个问题:
我如何过滤数据流,因为它想进入窗口,并检查是否在正确的时间戳为窗口创建的数据?
我怎样才能在一个变量中收集这么晚的数据来对它们进行处理呢?

uwopmtnx

uwopmtnx1#

flink有两种不同的、相关的抽象,它们处理在具有事件时间戳的流上计算窗口分析的不同方面:水印和允许延迟。
首先是水印,它在处理事件时间数据时起作用(无论是否使用windows)。水印提供了有关事件时间进度的信息,并为应用程序编写者提供了处理无序数据的方法。水印和数据流一起流动,每个水印在数据流中标记一个位置并携带一个时间戳。水印作为一种Assert,在流中的那个点,流现在(可能)已经完成,直到那个时间戳——或者换句话说,水印后面的事件不太可能来自水印指示的时间之前。最常见的水印策略是使用BoundedAutoFordernessTimestampExtractor,它假设事件在某个固定的、有界的延迟内到达。
这现在提供了延迟的定义——时间戳小于水印时间戳的水印后面的事件被认为是延迟的。
windowapi提供了允许延迟的概念,默认设置为零。如果允许的延迟大于零,那么事件时间窗口的默认触发器将接受延迟事件进入其相应的窗口,直到允许的延迟限制为止。窗口操作将在正常时间触发一次,然后针对每个延迟事件再次触发,直到允许的延迟间隔结束。之后,将丢弃延迟事件(如果配置了侧输出,则将其收集到侧输出)。

How can I filter data stream as it wants to enter the window and check 
if the data created at the right timestamp for the window?

flink的窗口分配程序负责将事件分配到适当的窗口——正确的事情将自动发生。将根据需要创建新的窗口示例。

How can I gather such late data in a variable to do some processing on them?

您可以对水印进行足够慷慨的处理,以避免出现任何延迟数据,和/或将允许的延迟时间配置为足够长,以容纳延迟事件。但是,请注意,flink将被迫保持所有仍在接受后期事件的窗口打开,这将延迟垃圾收集旧窗口,并可能消耗大量内存。
请注意,本讨论假设您希望使用时间窗口—例如,您正在使用的8毫秒长的窗口。flink还支持计数窗口(例如,将事件分为100个批)、会话窗口和自定义窗口逻辑。例如,如果使用计数窗口,则水印和延迟不起任何作用。
如果你想为你的分析按键结果,那么在应用窗口之前使用keyby按键(例如,按userid)对流进行分区。例如

stream
  .keyBy(e -> e.userId)
  .timeWindow(Time.seconds(10))
  .reduce(...)

将为每个userid生成单独的结果。
更新:请注意,在最新版本的flink中,windows现在可以将最新事件收集到一个side输出。
一些相关文件:
事件时间和水印
允许迟到

相关问题