如何在flink中构建1小时的重放流缓冲区?

s5a0g9ez  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(251)

我想动态地保留过去1小时事件的缓冲区。这个缓冲区应该给我一个重放函数,这样就可以对最后一小时的数据执行查询。flink中是否已经实现了一些东西?还是需要我自己建造?
我尝试使用windowapi,但是flink似乎没有给我一个固定宽度的时间窗口。

wn9m85ua

wn9m85ua1#

我有一个解决我自己问题的办法,但我想保留这个问题,以防你有更好的办法。因为我的肯定违反了函数式编程的一些好做法。
我的技巧如下。

val keyedEventStream: KeyedStream[E]

            // create a stream of [hourly window as a set of events]
            val eventWindowStream = keyedEventStream.timeWindow(Time.minutes(60), Time.milliseconds(50)).fold(scala.collection.Set[E]())((set: scala.collection.Set[E], event: E) => set + event)

            // This is the hourly buffer my process logic will use
            var workWindow = scala.collection.Set[E]()
            // update the workspace window with the stream of hourly window.
            eventWindowStream.map((set: scala.collection.Set[W]) => workWindow = set)

如您所见,last map的唯一目的是更新变量workwindow,这实际上是内联函数的副作用。。。

相关问题