赏金6天后到期。回答此问题可获得+50声望奖励。awethon想引起更多的注意**这个问题:
我希望看到一个最佳的管道与一些解释
我的活动如下: case class Event(user: User, stats: Map[StatType, Int])
每个事件都包含+1或-1个值。我有我目前的管道,工作良好,但产生新的事件,每一个变化的统计数据。
eventsStream
.keyBy(extractKey)
.reduce(reduceFunc)
.map(prepareRequest)
.addSink(sink)
在将它们与当前状态合并之前,我想在一个时间窗口中聚合这些增量。所以我想要同样的滚动减少,但有一个时间窗口。
当前简单滚动减少:
500 – last reduced value
+1
-1
+1
Emitted events: 501, 500, 501
带窗滚压:
500 – last reduced value
v-- window
+1
-1
+1
^-- window
Emitted events: 501
我试过天真的解决办法,把时间窗口就在前面 reduce
但是在阅读了文档之后,我发现reduce现在有了不同的行为。
eventsStream
.keyBy(extractKey)
.timeWindow(Time.minutes(2))
.reduce(reduceFunc)
.map(prepareRequest)
.addSink(sink)
似乎我应该做一个键控流,在缩短我的时间窗口后减少它:
eventsStream
.keyBy(extractKey)
.timeWindow(Time.minutes(2))
.reduce(reduceFunc)
.keyBy(extractKey)
.reduce(reduceFunc)
.map(prepareRequest)
.addSink(sink)
这是解决问题的正确途径吗?
1条答案
按热度按时间b0zn9rqh1#
可能有不同的选择,但其中之一是实施
WindowFunction
然后跑apply
开窗后:(
WindowFuntion
获取输入值类型、输出值类型和键类型的类型参数。)这里有一个例子。让我复制一下相关的片段:
我不知道你的数据看起来如何,所以我不能尝试一个完整的答案,但这应该作为灵感。