我使用的管道如下:
inputStream.keyBy(<keyMapper>).
connect(configurationBroadcastStream).
process(new KeyedBroadcastProcessFunction<...>() {
processBroadcastElement(...){...}
processElement(...){...}
}).
keyBy(<keyMapper>). // have to key output of process() again
window(DynamicEventTimeSessionWindow.withDynamicGap(...)).
trigger(new CustomTrigger()).
process(new CustomProcessWindowFn())
在 CustomTrigger()
,我正在注册 eventTimeTimer()
它会开火指示我的Windows的尽头。问题是 onEventTime()
方法不会被调用,即使在以下情况下:
我保证 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
使用 ascendingTimestampExtractor()
,我已经发送了一个事件,它确实将水印推得足够远,以至于 eventTimeTimer()
应该开火。
我错过了什么?是否与丢失的水印和 onTimer()
方法 KeyedBroadcastProcessFunction
? 我怀疑是这样,因为大卫·安德森在这个答案中的评论:
为非广播流添加特殊假水印(设置为watermark.max\u watermark)
我还没有实现一个名为ontimer的方法。然而,如果真的是这样的话,我不明白这和下游触发器有什么关系。谢谢。
编辑:这里是这个场景的完整示例。
1条答案
按热度按时间wvyml7n51#
是的,问题是广播流没有水印(但是不,如果
KeyedBroadcastProcessFunction
是否有ontimer方法。一旦你得到了流动的水印,他们将通过流到窗口而不管。)当一个操作符有两个或多个输入时——在您的例子中
inputStream
以及configurationBroadcastStream
是连接的——该运算符处的水印将是来自其输入的水印的最小值。由于广播流没有水印,这就抑制了inputStream
.我举了一个例子来说明你是如何处理这个问题的。假设您的广播流不需要任何定时信息,您可以实现一个时间戳提取器和水印赋值器,它可以有效地将水印控制权让给另一个流。像这样: