我想在flink中创建一个基于eventtime的会话窗口,这样当新消息的事件时间比创建该窗口的消息的事件时间长180秒以上时,它就会触发。
例如:
t1(0 seconds) : msg1 <-- This is the first message which causes the session-windows to be created
t2(13 seconds) : msg2
t3(39 seconds) : msg3
.
.
.
.
t7(190 seconds) : msg7 <-- The event time (t7) is more than 180 seconds than t1 (t7 - t1 = 190), so the window should be triggered and processed now.
t8(193 seconds) : msg8 <-- This message, and all subsequent messages have to be ignored as this window was processed at t7
我想创建一个触发器,以便通过适当的水印或一个临时触发器实现上述行为。有谁能举一些例子来实现这一点吗?
1条答案
按热度按时间xriantvc1#
最好的方法是使用processfunction,而不是自定义窗口。如果如您的示例所示,事件将按时间戳顺序处理,那么这将非常简单。另一方面,如果您必须处理无序事件(这在处理事件时间数据时很常见),那么它会稍微复杂一些(假设msg6和for time 187在t8之后到达。如果这是可能的,如果这会影响到你想要产生的结果,那么这必须得到处理。)
如果事件是有序的,那么逻辑大致如下:
使用ascendingtimestampextractor作为水印的基础。
使用flink状态(可能是liststate)存储窗口内容。当某个事件到达时,将其添加到窗口中,并检查第一个事件发生后是否超过180秒。如果是,请处理窗口内容并清除列表。
如果您的事件可能无序,请使用BoundedAutoFordernessTimestampExtractor,并且在currentwatermark指示事件时间超过窗口开始时间180秒之前不要处理窗口的内容(您可以为此使用事件时间计时器)。触发窗口时不要完全清除列表,只需删除属于正在关闭的窗口的元素即可。