我想知道flink是否适合以下用例。假设我有一个测量流(设备id,值),例如。
(1, 10.2), (2, 3.4), (3, 9.1), (1, 7.0), (3, 6.3), (5, 17.8)
我每分钟都要报告到目前为止看到的任何设备id的最新值。
根据数据:
data: (1, 10.2), (2, 3.4), (3, 9.1), (1, 7.0), (3, 6.3), (5, 17.8)
time: 0 ----------------- 1min -------------- 2min ------------------ 3min
我想要一个结果:
1: { (1, 10.2), (2, 3.4) }
2: { (1, 7.0), (2, 3.4), (3, 9.1) }
3: { (1, 7.0), (2, 3.4), (3, 6.3), (5, 17.8) }
我提出了包括
.windowAll(GlobalWindows.create()).trigger(CountTrigger.of(1)).apply( ... )
但是在一个大的数据集上它看起来并不好(内存方面)。还有别的办法吗?
1条答案
按热度按时间qcuzuvrc1#
您可能需要考虑以下内容作为起点:
需要指出的是:
我不建议用windows来做这个。使用globalwindows,管理过期状态变得很复杂。
我使用了带标点水印的赋值函数,而不是升序时间戳抽取函数。我这样做有三个原因:(1)一旦切换到并行运行,可能很难确保事件按顺序到达(2) AscendingTimeStampExtractor定期生成水印(默认情况下,每200毫秒实时一次),在本例中,应用程序在生成第一个水印之前已经消耗了所有输入(3) processelement方法中的一个简单检查就是处理无序事件所需的全部。但是如果事件确实有序,那么在生产中使用ascendingtimestampextractor或BoundedAutoforErnessTimestampExtractor可能会更好。
输出如下所示:
(11120,2,10.0)在9999触发的原因是,正是这个时间戳为11120的事件的到来使水印前进到9999之后,导致计时器触发。在调用ontimer时,onelement已经被调用了。
ctx.timerservice().currentwatermark()<long.max\u值的检入计时器是这样,这个有限的示例不会永远运行。如果流式处理作业到达其输入的结尾,则注入时间戳为long.max\u值的最终水印,以导致任何剩余计时器的最后一次触发。在这种情况下,我们不应该创建另一个计时器。