我的flink工作必须在每次轮班后计算一定的聚合。班次是可配置的,看起来像:
1st shift: 00:00am - 06:00am
2nd shift: 06:00am - 12:00pm
3rd shift: 12:00pm - 18:00pm
出于运营目的,每天的班次都是相同的,一周中的几天/一年中的几天没有区别。班次配置可以随时间变化,也可以是非单调的,因此表中没有一个简单的事件时间窗口,如: TumblingEventTimeWindows.of(Time.of(6, HOURS))
因为有些班次可能会缩短或延长加班时间,或者可能会在中间插入几个小时的休息时间。。。
我基于一个globalwindow和一个自定义触发器想出了一些东西:
LinkedList<Shift> shifts;
datastream.windowAll(GlobalWindows.create())
.trigger(ShiftTrigger.create(shifts))
.aggregate(myAggregateFunction)
在我的自定义触发器中,我试图识别传入事件是否超过正在进行的工作班次的结束时间,并为班次触发窗口:
@Override
public TriggerResult onElement(T element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
// compute the end time of the on-going shift
final Instant currentShiftEnd = ...
// fire window for the shift if the event passes the end line
if (ShiftPredicate.of(currentShiftEnd).test(element)) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
省略了状态管理和一些记忆优化的代码,这在流式用例中似乎可以很好地工作:在一个班次结束时间之后出现的第一个事件触发了最后一个班次的触发和聚合。
但是,作业可以在有日期参数限制的情况下运行(例如:重新处理过去的期间),或者由于一系列预期的原因而提前关闭。当这种情况发生时,我观察到最后一个窗口没有被触发/刷新,
一天的最后一班在午夜结束,第二天的第一班就要开始了。晚上23:59有个活动,班次就要结束了。然而,今天的工作只是运行,并且在00:00结束。由于没有新元素到达通过该行触发窗口触发的自定义触发器,因此不会计算上一个班次的聚合,但是,即使在下一个班次中没有发生任何事情,或者作业在正在进行的班次中间终止,仍然会有部分结果。
我读到这样做的原因是:
flink保证只删除基于时间的窗口,而不删除其他类型的窗口,例如全局窗口(请参阅窗口分配程序)
我已经看了一眼房子里面 org.apache.flink.streaming.api.windowing
包来寻找像 TumblingEventTimeWindows
或者 DynamicEventTimeSessionWindows
我可以使用或延长一天的结束时间,这样当元素的水印超过窗口限制时,我就可以依赖这些触发的默认事件时间触发器,但我不知道如何做到这一点。直觉上我希望有这样的东西:
shifts.forEach(shift -> {
datastream.windowAll(EventTimeWindow.fromTo(DAILY, shift.startTime, shift.endTime))
.aggregate(myAggregateFunction);
});
我知道对于任意复杂的用例,有些人做的是抛弃windowsapi,不利于低级别的进程函数,他们通过将元素作为操作符的托管状态来“手动”计算窗口,而在给定的规则或条件下,他们从定义的聚合函数或累加器中拟合并提取结果。同样在过程函数中,可以通过点击 onClose
钩子。
有没有一种方法可以通过扩展windowsapi中的任何对象来获得每天特定时间内重复事件时间窗口的概念?
1条答案
按热度按时间dzhpxtsq1#
如果我理解正确,这里有两个单独的问题需要解决:
如何处理没有统一的窗口边界。
如何终止作业而不丢失最后一个窗口的结果。
对于(1),您使用
GlobalWindows
有一个习惯ShiftTrigger
是一条路要走。如果您想探索一种使用流程函数的替代方法,我已经编写了一个示例,您可以在flink文档中找到。为了获得更流畅的api,您可以创建一个自定义api
WindowAssigner
,这样就可以利用内置的EventTimeTrigger
作为默认触发器。为此,您需要实现WindowAssigner
接口。对于(2),只要您依赖于事件时间处理,最后一组窗口就不会被触发,除非在作业终止之前到达足够大的水印来关闭它们。这通常要求您有一个事件,其时间戳在窗口结束后足够长,这样就创建了一个足够大的水印来触发窗口(并且作业保持运行的时间足够长,以便发生这种情况)。
但是,当flink意识到流式处理作业即将自然结束时,它将自动注入一个水印,其时间戳设置为max\u watermark,其效果是触发所有事件时间计时器,并关闭所有事件时间窗口。对于任何有界源,这都会自动发生。例如,对于kafka,您还可以通过让反序列化程序从
isEndOfStream
.处理这个问题的另一种方法是避免在完成这些任务时取消它们,而是使用
./bin/flink stop --drain [-p savepointPath] <jobID>
以干净地停止作业(使用保存点),同时排出所有剩余的窗口结果(通过注入最后一个大水印(max\u watermark))来完成此操作。