我试图在Flink创建我的第一个实时分析工作。这种方法类似于kappa架构,所以我有关于kafka的原始数据,在那里我们接收到任何实体状态的每一个变化的消息。
因此信息的形式如下:
(id,newStatus, timestamp)
我们要计算,对于每个时间窗口,给定状态下的项目数。所以输出的形式应该是:
(outputTimestamp, state1:count1,state2:count2 ...)
或同等产品。这些行在任何给定时间都应包含给定状态下的项计数,其中与id关联的状态是为该id观察到的最新消息。在任何情况下,都应计算id的状态,即使事件比正在处理的事件早得多。因此,所有计数的总和应该等于系统中观察到的不同id的数目。接下来的步骤可能会在一段时间后忘记最后一个项目中的项目,但现在这不是一个严格的要求。
这将写在elasticsearch上,然后进行查询。
我尝试了许多不同的途径,但没有一条完全符合要求。使用滑动窗口,我可以很容易地实现预期的行为,只是当滑动窗口的开始超过事件的时间戳时,它会丢失计数,正如您所料。其他方法在处理积压工作时无法保持一致,因为我对密钥和时间戳做了一些技巧,但在一次处理完数据时失败了。
所以我想知道,即使是在高水平上,我应该如何处理这个问题。它看起来像是一个相对常见的用例,但是必须无限期地保留给定id的相关信息,以便正确地计算实体,这一事实会产生很多问题。
1条答案
按热度按时间umuewwlo1#
我想我有办法解决你的问题:
给予
DataStream
的(id, state, time)
作为:实际状态更改如下所示:
val cntUpdatesPerWindow: DataStream[(Int, Int, Long)] = stateCntUpdates // (state, cntUpdate, time)
.keyBy(_._1) // key by state
.timeWindow(Time.minutes(10)) // window should be non-overlapping, e.g. Tumbling
.apply(new SumReducer(), new YourWindowFunction())
```
SumReducer
对CNT更新和YourWindowFunction
指定窗口的时间戳。此步骤聚合窗口中每个状态的所有状态更改。最后,我们使用计数更新来调整当前计数。