java—强制收回滑动事件窗口以在flink上处理(历史流)

tktrz96b  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(531)

目前,我正在使用flink对流处理引擎进行研究。在我的研究中,我使用历史流,它由以下形式的元组组成: event_time, attribute_1, ..., attribute_X 哪里 event_time 用作 TimeCharacteristic.EventTime 在处理过程中。此外,我将数据集推入处理拓扑,方法是:(i)创建内存结构,或(ii)读取csv文件本身。
不幸的是,我注意到即使窗口操作符中有足够的元组来完成一个完整的窗口,该窗口也不会被推到下游进行处理。结果,性能明显下降,很多时候我都有一个 OutOfMemoryError 例外情况(有大量历史数据流)。
为了说明一个典型的用例,我提供了以下示例:

StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
    l.add(new Tuple2<>(1L, 11));
    l.add(new Tuple2<>(2L, 22));
    l.add(new Tuple2<>(3L, 33));
    l.add(new Tuple2<>(4L, 44));
    l.add(new Tuple2<>(5L, 55));
    DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
    stream.assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
            @Override
            public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
                return t.f0;
            }
        })
        .windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2), 
                Time.milliseconds(1)))
        .sum(1)
        .print();
    env.execute();

根据 l 的内容,我需要有以下窗口结果:
[0,2]和:11
[1,3]总和:33
[2,4]总和:55
[3,5]总和:77
[4,6]总和:99
[5,7]总和:55
每个列表项可以被读取为[开始时间戳,结束时间戳],sum:x。
我希望flink每次出现一个时间戳超过打开窗口的结束时间戳的元组时,都会产生一个窗口化的结果。例如,我希望窗口[1,3]的求和会在具有timestamp的元组 4L 输入窗口操作符。但是,当来自的所有元组 l 被推送到流的拓扑中。当我处理较大的历史流时,也会发生同样的情况,这会导致性能下降(甚至耗尽内存)。
问:如何在窗口完成时强制flink将窗口推到下游进行处理?
我相信这是为了 SlidingEventTimeWindows 窗口的逐出是由水印触发的。如果前面的是真的,我如何编写拓扑,以便它们在具有稍后时间戳的元组到达时触发窗口?
谢谢您

sr4lhrrt

sr4lhrrt1#

AscendingTimestampExtractor 使用周期性水印策略,其中flink将调用 getCurrentWatermark() 方法,其中n是自动水印间隔。
默认间隔为200毫秒,与窗口的大小相比,这是非常长的。然而,它们并不具有直接的可比性——200毫秒是以处理时间而不是事件时间来度量的。不过,我怀疑如果您没有更改此配置设置,那么在发出第一个水印之前会创建很多窗口,我认为这可以解释您看到的情况。
您可以减少自动水印的间隔(也许到1毫秒)。或者您可以实现一个带有标点水印的赋值器,这将给您更多的控制。

相关问题