目前,我正在使用flink对流处理引擎进行研究。在我的研究中,我使用历史流,它由以下形式的元组组成: event_time, attribute_1, ..., attribute_X
哪里 event_time
用作 TimeCharacteristic.EventTime
在处理过程中。此外,我将数据集推入处理拓扑,方法是:(i)创建内存结构,或(ii)读取csv文件本身。
不幸的是,我注意到即使窗口操作符中有足够的元组来完成一个完整的窗口,该窗口也不会被推到下游进行处理。结果,性能明显下降,很多时候我都有一个 OutOfMemoryError
例外情况(有大量历史数据流)。
为了说明一个典型的用例,我提供了以下示例:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
l.add(new Tuple2<>(1L, 11));
l.add(new Tuple2<>(2L, 22));
l.add(new Tuple2<>(3L, 33));
l.add(new Tuple2<>(4L, 44));
l.add(new Tuple2<>(5L, 55));
DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
stream.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
return t.f0;
}
})
.windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2),
Time.milliseconds(1)))
.sum(1)
.print();
env.execute();
根据 l
的内容,我需要有以下窗口结果:
[0,2]和:11
[1,3]总和:33
[2,4]总和:55
[3,5]总和:77
[4,6]总和:99
[5,7]总和:55
每个列表项可以被读取为[开始时间戳,结束时间戳],sum:x。
我希望flink每次出现一个时间戳超过打开窗口的结束时间戳的元组时,都会产生一个窗口化的结果。例如,我希望窗口[1,3]的求和会在具有timestamp的元组 4L
输入窗口操作符。但是,当来自的所有元组 l
被推送到流的拓扑中。当我处理较大的历史流时,也会发生同样的情况,这会导致性能下降(甚至耗尽内存)。
问:如何在窗口完成时强制flink将窗口推到下游进行处理?
我相信这是为了 SlidingEventTimeWindows
窗口的逐出是由水印触发的。如果前面的是真的,我如何编写拓扑,以便它们在具有稍后时间戳的元组到达时触发窗口?
谢谢您
1条答案
按热度按时间sr4lhrrt1#
AscendingTimestampExtractor
使用周期性水印策略,其中flink将调用getCurrentWatermark()
方法,其中n是自动水印间隔。默认间隔为200毫秒,与窗口的大小相比,这是非常长的。然而,它们并不具有直接的可比性——200毫秒是以处理时间而不是事件时间来度量的。不过,我怀疑如果您没有更改此配置设置,那么在发出第一个水印之前会创建很多窗口,我认为这可以解释您看到的情况。
您可以减少自动水印的间隔(也许到1毫秒)。或者您可以实现一个带有标点水印的赋值器,这将给您更多的控制。