如何在通过reduce函数与当前状态合并之前聚合flink流中的事件?

lb3vh1jj  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(417)

赏金6天后到期。回答此问题可获得+50声望奖励。awethon想引起更多的注意**这个问题:

我希望看到一个最佳的管道与一些解释
我的活动如下: case class Event(user: User, stats: Map[StatType, Int]) 每个事件都包含+1或-1个值。我有我目前的管道,工作良好,但产生新的事件,每一个变化的统计数据。

eventsStream
    .keyBy(extractKey)
    .reduce(reduceFunc)
    .map(prepareRequest)
    .addSink(sink)

在将它们与当前状态合并之前,我想在一个时间窗口中聚合这些增量。所以我想要同样的滚动减少,但有一个时间窗口。
当前简单滚动减少:

500 – last reduced value
+1
-1
+1

Emitted events: 501, 500, 501

带窗滚压:

500 – last reduced value
v-- window
+1
-1
+1
^-- window

Emitted events: 501

我试过天真的解决办法,把时间窗口就在前面 reduce 但是在阅读了文档之后,我发现reduce现在有了不同的行为。

eventsStream
    .keyBy(extractKey)
    .timeWindow(Time.minutes(2))
    .reduce(reduceFunc)
    .map(prepareRequest)
    .addSink(sink)

似乎我应该做一个键控流,在缩短我的时间窗口后减少它:

eventsStream
    .keyBy(extractKey)
    .timeWindow(Time.minutes(2))
    .reduce(reduceFunc)
    .keyBy(extractKey)
    .reduce(reduceFunc)
    .map(prepareRequest)
    .addSink(sink)

这是解决问题的正确途径吗?

b0zn9rqh

b0zn9rqh1#

可能有不同的选择,但其中之一是实施 WindowFunction 然后跑 apply 开窗后:

eventsStream
    .keyBy(extractKey)
    .timeWindow(Time.minutes(2))
    .apply(new MyWindowFunction)

( WindowFuntion 获取输入值类型、输出值类型和键类型的类型参数。)
这里有一个例子。让我复制一下相关的片段:

/**User-defined WindowFunction to compute the average temperature of SensorReadings */
class TemperatureAverager extends WindowFunction[SensorReading, SensorReading, String, TimeWindow] {

  /**apply() is invoked once for each window */
  override def apply(
    sensorId: String,
    window: TimeWindow,
    vals: Iterable[SensorReading],
    out: Collector[SensorReading]): Unit = {

    // compute the average temperature
    val (cnt, sum) = vals.foldLeft((0, 0.0))((c, r) => (c._1 + 1, c._2 + r.temperature))
    val avgTemp = sum / cnt

    // emit a SensorReading with the average temperature
    out.collect(SensorReading(sensorId, window.getEnd, avgTemp))
}

我不知道你的数据看起来如何,所以我不能尝试一个完整的答案,但这应该作为灵感。

相关问题