使用broadcaststate模式时,如何激发下游oneventtime()方法?

jk9hmnmh  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(253)

我使用的管道如下:

inputStream.keyBy(<keyMapper>).
connect(configurationBroadcastStream).
process(new KeyedBroadcastProcessFunction<...>() {
     processBroadcastElement(...){...}
     processElement(...){...}
     }).
keyBy(<keyMapper>). // have to key output of process() again
window(DynamicEventTimeSessionWindow.withDynamicGap(...)).
trigger(new CustomTrigger()).
process(new CustomProcessWindowFn())

CustomTrigger() ,我正在注册 eventTimeTimer() 它会开火指示我的Windows的尽头。问题是 onEventTime() 方法不会被调用,即使在以下情况下:
我保证 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 使用 ascendingTimestampExtractor() ,我已经发送了一个事件,它确实将水印推得足够远,以至于 eventTimeTimer() 应该开火。
我错过了什么?是否与丢失的水印和 onTimer() 方法 KeyedBroadcastProcessFunction ? 我怀疑是这样,因为大卫·安德森在这个答案中的评论:
为非广播流添加特殊假水印(设置为watermark.max\u watermark)
我还没有实现一个名为ontimer的方法。然而,如果真的是这样的话,我不明白这和下游触发器有什么关系。谢谢。
编辑:这里是这个场景的完整示例。

wvyml7n5

wvyml7n51#

是的,问题是广播流没有水印(但是不,如果 KeyedBroadcastProcessFunction 是否有ontimer方法。一旦你得到了流动的水印,他们将通过流到窗口而不管。)
当一个操作符有两个或多个输入时——在您的例子中 inputStream 以及 configurationBroadcastStream 是连接的——该运算符处的水印将是来自其输入的水印的最小值。由于广播流没有水印,这就抑制了 inputStream .
我举了一个例子来说明你是如何处理这个问题的。假设您的广播流不需要任何定时信息,您可以实现一个时间戳提取器和水印赋值器,它可以有效地将水印控制权让给另一个流。像这样:

// Once the two streams are connected, the Watermark of the KeyedBroadcastProcessFunction operator
// will be the minimum of the Watermarks of the two connected streams. Our config stream has a default
// Watermark at Long.MIN_VALUE, and this will hold back the event time clock of the
// KeyedBroadcastProcessFunction, unless we do something about it.

public static class ConfigStreamAssigner implements AssignerWithPeriodicWatermarks<String> {
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return Watermark.MAX_WATERMARK;
    }

    @Override
    public long extractTimestamp(String element, long previousElementTimestamp) {
        return 0;
    }
}

相关问题