在使用基于键控状态的操作符处理流之后,我们的流不能再保证是有序的。
因此,我实现了一个排序操作符,它利用处理时间窗口来缓冲流元素,并以排序的方式发出它们。我读取了使用计时器和map状态的this suggestion。然而,我害怕注册很多计时器,并将window based solution修改为suggested by one answer。
它可以像这样插入到管道中:
someStream
.keyBy(k -> 1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(
new BufferedSorter<SomePojo, Integer>() {
@Override
protected TypeInformation<VehicleEvent> getTypeInfo() {
return TypeInformation.of(SomePojo.class);
}
@Override
protected Timestamp getTime(SomePojo element) {
return element.getEventTime();
}
})
字符串
虽然这工作得很好,因为可以接受的处理时间延迟是正确的,但我注意到 * WindowProcessFunction没有为空闲输入流执行 *。
一旦我将元素发布到流中,窗口就会被触发(与元素的事件时间无关)。这对我来说是违反直觉的,因为我期望窗口会被纯粹的处理时间提前触发。
1条答案
按热度按时间jmo0nnb31#
你的场景细节尚不清楚,但以下是相关的事实:
我想知道您是否在管道的一部分中使用了摄取时间逻辑,而不是处理时间,这可能会引起一些混乱。