我试图做一个简单的基于Kafka源代码的滑动窗口聚合。
Kafka上的事件都包含一个timestamp元素,并且按升序排列。我尝试过使用不同的周期性水印(升序的、有界的和自定义的,以便更容易地调试内部发生的事情)。我看得出 extractTimestamp
方法总是被调用,但是 getCurrentWatermark
方法从未被调用。
我已经定好了 autoWatermarkInterval
即使是1ms,每个子任务的水印也不会更新。我已经通过使用flinkui和查看可用的度量来验证了这一点。
关于这个主题,我读过很多类似的问题,大多数都是关于Windows从不发光的,原因有很多。我还没能找出为什么它永远不能提升水印的原因。
我还确认没有数据作为延迟数据进行侧输出。
最基本的形式是:
val rfq = kafkaDataStream
.assignAscendingTimestamps(_.timestamp.toEpochMilli)
.keyBy("id")
val lateTag = new OutputTag[RFQ]("late") {}
val predictions: DataStream[RFQPrediction] = rfq
.window(SlidingEventTimeWindows.of(5,3))
.sideOutputLateData(lateTag)
.aggregate(new PricePredictionsAggregate)
.name("windowed-predictions")
我已经证实了它和一个 AssignerWithPunctuatedWatermarks
.
是什么原因造成的 getCurrentWatermark
方法,即使间隔设置为1ms也不会被调用?
我提供的测试数据使用有限的id列表,为这些id不断生成事件,时间戳不断增加。
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!