flink的协处理器函数不会触发ontimer

gijlo24d  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(535)

我试着像那样把两条溪流聚合起来

val joinedStream = finishResultStream.keyBy(_.searchId)
  .connect(startResultStream.keyBy(_.searchId))
  .process(new SomeCoProcessFunction)

然后在实验室工作 SomeCoProcessFunction 像这样的班级

class SomeCoProcessFunction extends CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated] {

   override def processElement1(finished: SearchFinished, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = { 

       // aggregating some "finished" data ...

   }

   override def processElement2(created: SearchCreated, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = {

       val timerService = ctx.timerService()
       timerService.registerEventTimeTimer(System.currentTimeMillis + 5000)

       // aggregating some "created" data ...
   }

   override def onTimer(timestamp: Long, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#OnTimerContext, out: Collector[SearchAggregated]): Unit = {

       val watermark: Long = ctx.timerService().currentWatermark()
       println(s"watermark!!!! $watermark")

       // clean up the state

   }

我想要的是在一段时间(5000毫秒)后清除状态,这就是 onTimer 必须用于。但既然它从来没有被炒过,我有点问自己我做错了什么?
提前谢谢你的提示。
更新:
解决方案是这样设置timeservice(对fabian hueske和beckham都是tnx): timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5000) 我还是没弄清楚是什么 timerService.registerEventTimeTimer 是的,水印 ctx.timerService().currentWatermark() 始终显示 -9223372036854775808 现在不管eventtimer注册前多久。

kmbjn2e3

kmbjn2e31#

我看到你在用 System.currentTimeMillis 这可能与 TimeCharacteristic (事件时间、处理时间、摄取时间)flink作业使用的时间。
尝试获取事件的时间戳 ctx.timestamp() 再加上5000毫秒。

3j86kqsm

3j86kqsm2#

问题是您正在注册事件时间计时器( timerService.registerEventTimeTimer )具有处理时间戳( System.currentTimeMillis + 5000 ). System.currentTimeMillis 返回当前计算机时间,但事件时间不是基于计算机时间,而是基于从水印计算的时间。
您应该注册一个处理计时器,或者注册一个带有事件时间戳的事件时间计时器。您可以从中获取当前水印的时间戳或当前记录的时间戳 Context 作为参数传递给 processElement1() 以及 processElement2() .

相关问题