如何实时计算每个事件在过去2小时内的计数?

zyfwsgd6  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(347)

我们用flink监视每一个事件。详细场景是当一个事件到达时,flink找出过去2小时内所有具有相同userid的事件,并对count字段求和。例如:

event1<userid1, n1, t0>     -> real time result = n1
    event2<userid2, n2, t0+1h>  -> real time result = n2
    event3<userid1, n3, t0+1h>  -> real time result = n1+n3
    event4<userid1, n4, t0+2.5h>  -> real time result = n3+n4

我们如何在flink中实现这样的场景?直观地说,我们想使用滑动窗口,但有两个问题:
在flink中,滑动窗口按参数slide大小滑动。但是,在我们的场景中,每个事件的窗口都会滑动,这意味着每个事件的窗口的开始/结束点是不同的(预期的窗口范围:[eventtime-2h,eventtime])。我们应该通过设置一个小的幻灯片大小(10ms)来实现这一点吗?
进程函数是由触发器函数执行的,这意味着我们不能在事件到达时立即得到结果?

72qzrwbm

72qzrwbm1#

你可以继续使用滑动窗口,但使用你自己的 Trigger 发射到达窗口的元素。示例代码可能如下:

src.map(x => new Tuple2(x.id, x.value))
  .keyBy(0)
  .timeWindow(Time.seconds(2), Time.seconds(1))
    .trigger(new Trigger[Tuple2[String, Int], TimeWindow] {
      override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
        TriggerResult.CONTINUE
      }

      override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
        TriggerResult.FIRE
      }

      override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
      }

      override def onElement(element: Tuple2[String, Int], timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
        TriggerResult.FIRE
      }
    })
  .sum(1)
jdzmm42g

jdzmm42g2#

你可以通过使用 ProcessFunction . 这是细节。

相关问题