val keyedEventStream: KeyedStream[E]
// create a stream of [hourly window as a set of events]
val eventWindowStream = keyedEventStream.timeWindow(Time.minutes(60), Time.milliseconds(50)).fold(scala.collection.Set[E]())((set: scala.collection.Set[E], event: E) => set + event)
// This is the hourly buffer my process logic will use
var workWindow = scala.collection.Set[E]()
// update the workspace window with the stream of hourly window.
eventWindowStream.map((set: scala.collection.Set[W]) => workWindow = set)
1条答案
按热度按时间wn9m85ua1#
我有一个解决我自己问题的办法,但我想保留这个问题,以防你有更好的办法。因为我的肯定违反了函数式编程的一些好做法。
我的技巧如下。
如您所见,last map的唯一目的是更新变量workwindow,这实际上是内联函数的副作用。。。