Flink 不关闭窗口与EventTimeWindows

fkvaft9z  于 2023-04-18  发布在  Apache
关注(0)|答案(2)|浏览(277)

为什么这段代码没有给予任何东西?如果我改为TumblingProcessingTimeWindows-所有工作正常。
我没有在文档中找到我还必须添加什么?触发器?驱逐器?允许迟到?

WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((i, timestamp) -> Timestamp.valueOf(i.dt).getTime());

        ds.assignTimestampsAndWatermarks(strategy)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).reduce((acc, i) -> {
                    acc.count += i.count;
                    acc.dt = i.dt;
                    return acc;
                }).addSink(new PrintSinkFunction());

输入:

{"userId":1,"count":11,"dt":"2023-04-11T09:29:12.244"}

系统时间=输入时间

更新二:

1.我在withTimestampAssigner中添加了一些打印信息-它在每个事件中都被调用。
1.我添加了OutputTag用于捕获丢弃的事件-它很清楚。
return new OutputTag(){};
1.我添加了debug print internal来减少函数-它在每个事件上都被调用。
但是print(sink)对于关闭输出窗口没有=(
所有代码:

private static void m4(DataStream<UserModel> ds) {
        WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((i, timestamp) -> {
                    long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
                    System.out.println(i.dt + " is: " + time +  " dont know: " + timestamp);
                    return time;
                });

        OutputTag<UserModel> lateTag = new OutputTag<UserModel>("late"){};

        SingleOutputStreamOperator<UserModel> reduce = ds.assignTimestampsAndWatermarks(strategy)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .sideOutputLateData(lateTag)
                .reduce((acc, i) -> {
                    System.out.println(i.dt + " reDUCE:");
                    acc.count += i.count;
                    acc.dt = i.dt;
                    return acc;
                });
                reduce.getSideOutput(lateTag).print();

                reduce.addSink(new PrintSinkFunction());

    }

更新3:

有趣的是,TumblingEventTimeWindows需要ProcessAllWindowFunction,尽管TumblingProcessingTimeWindows没有它也能工作
所以我添加了ProcessAllWindowFunction,但还是没有结果,我添加了print来调试,这部分代码没有调用
有趣的一点,如果我改变到TumblingProcessingTimeWindows,窗口关闭和下沉即使没有ProcessAllWindowFunction x1c 0d1x
所有代码:

public class Rich extends ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow> {
    @Override
    public void process(ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow>.Context context, Iterable<UserModel> iterable, Collector<UserModelEx> collector) throws Exception {
        UserModel um = iterable.iterator().next();
        System.out.println(um.count + " rich:" + um.dt);
        collector.collect(new UserModelEx() {{
            userId = um.userId;
            count = um.count;
            wStart = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getStart()), ZoneOffset.UTC);
            wEnd = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getEnd()), ZoneOffset.UTC);
        }});
    }
}

 private static void m4(DataStream<UserModel> ds) {

        WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((i, timestamp) -> {
                    long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
                    System.out.println(i.dt + " assignEvent: " + time + " : " + timestamp);
                    return time;
                });

        SingleOutputStreamOperator<UserModelEx> reduce = ds.assignTimestampsAndWatermarks(strategy)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce((acc, i) -> {
                    acc.count += i.count;
                    acc.dt = i.dt;
                    System.out.println(acc.dt + " reduce:" + acc.count);
                    return acc;
                }, new Rich());

        reduce.print();

        //reduce.addSink(new PrintSinkFunction<UserModelEx>());

    }
46scxncf

46scxncf1#

所以4天后我发现了问题,但我不能解释为什么它是必要的。
env.setParallelism(1);-解决了我的问题。
我正在从topic partition = 1阅读Kafka,即默认情况下并行度应该是1。
我希望Maven们能解释一下为什么我的例子需要这个参数...

qncylg1j

qncylg1j2#

通常情况下,你会提供一个ProcessAllWindowFunction与reduce调用,请参阅此API。ProcessAllWindowFunction.process方法将为每个窗口调用,并由迭代器提供单个元素(所有reduce调用的结果)。

相关问题