Flink 如果窗口在阅读完数据之前关闭,会发生什么情况?

vjhs03f7  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(163)

我们使用Apache Flink来读取一个Kafka主题,其中包含一个Id和一个对象列表,如下所示:

{
    Id: "12345",
    Objetcs: [
        {
            fatherId: "1a",
            id: "111",
            name: "aabc"
        },
        {
            fatherId: "1b",
            id: "222",
            name: "abffc"
        },
        {
            fatherId: "1a",
            id: "333",
            name: "gfds"
        },
        ...
    ]
}

然后我们将其转换为包含listId和乘积的元组数据流。最后我们执行一个KeyBy和一个10秒的TumblingProcessingTimeWindows,以便按listId和fhaterId对数据进行分组,并将分组结果转换为字符串,如下所示:“{ [ {fhaterId:“1a”,标识:“222”,名称:“aabc”},{相片ID:“1a”,标识:“333”,名称:“gfds”} ]、[ {发件人ID:“1a“,标识:“222”,名称:“aabc”} ] }”
问题在于,在某些测试中,我们发送了5个列表,每个列表包含128,000个数据,预期结果应该是5个字符串,但有时会因为其中一条消息被拆分而收到6个字符串。在上面的示例中,结果如下:字符串1:“{[{fhaterId:“1a”,标识:“222”,名称:“aabc”}]、[{发件人ID:“1a”,标识:“222”,名称:“aabc”}]}”字符串2:“{[{fhaterId:“1a”,标识:“333”,名称:“gfds”}]}”
当预期响应是单个字符串时。
原因何在?
流程代码如下:

DataStream<Result> sourceNegotiation = listNegotiationProducts
                .flatMap(new FlatMapFunction<ListNegotiationProduct, Tuple2<UUID, NegotiationProduct>>() {
                    @Override
                    public void flatMap(ListNegotiationProduct listNegotiationProduct, Collector<Tuple2<UUID, NegotiationProduct>> out) throws Exception {
                        listNegotiationProduct.getProducts().forEach(lnp -> {
                            Tuple2<UUID, NegotiationProduct> response = new Tuple2<>(listNegotiationProduct.getTransactionId(), lnp);
                            out.collect(response);
                        });
                    }
                })
                .keyBy(new KeySelector<Tuple2<UUID, NegotiationProduct>, Tuple2<UUID, Integer>>() {
                    @Override
                    public Tuple2<UUID, Integer> getKey(Tuple2<UUID, NegotiationProduct> value) throws Exception {
                        return Tuple2.of(value.f0, value.f1.getNegotiationId());
                    }
                })
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(1))
                .apply(new WindowFunction<Tuple2<UUID, NegotiationProduct>, Tuple2<UUID, Negotiation>, Tuple2<UUID, Integer>, TimeWindow>() {
                    @Override
                    public void apply(Tuple2<UUID, Integer> uuidIntegerTuple2, TimeWindow window, Iterable<Tuple2<UUID, NegotiationProduct>> iterable, Collector<Tuple2<UUID, Negotiation>> collector) throws Exception {
                        Negotiation negotiation = new Negotiation();
                        Tuple2<UUID, Negotiation> response = new Tuple2<>();

                        List<Product> productList = new ArrayList<>();

                        iterable.iterator().forEachRemaining(negotiationProduct -> {

                            negotiation.setNegotiationId(negotiationProduct.f1.getNegotiationId());
                            response.setField(negotiationProduct.f0, 0);

                            List<String> observationList = new ArrayList<>();

                            observationList.add(negotiationProduct.f1.getObservation());

                            productList.add(Product
                                    .builder()
                                    .productGtin(negotiationProduct.f1.getProductGtin())
                                    .state(negotiationProduct.f1.getState())
                                    .observation(observationList)
                                    .retailerCode(negotiationProduct.f1.getRetailerCode()).build());
                        });

                        negotiation.setNegotiationProgressProducts(productList);

                        response.setField(negotiation, 1);
                        collector.collect(response);
                    }
                })
                .keyBy(t -> t.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(1))
                .apply(new WindowFunction<Tuple2<UUID, Negotiation>, Result, UUID, TimeWindow>() {
                    @Override
                    public void apply(UUID uuid, TimeWindow window, Iterable<Tuple2<UUID, Negotiation>> iterable, Collector<Result> collector) throws Exception {
                        List<Negotiation> negotiations = new ArrayList<>();
                        iterable.iterator().forEachRemaining(n -> {
                            negotiations.add(n.f1);
                        });
                        collector.collect(BuildResult.build(new Payload(negotiations), uuid));
                    }
                })
                .returns(Result.class);
yftpprvb

yftpprvb1#

我认为问题是TumblingProcessingTimeWindows与系统时钟对齐。有时候你会很不走运,一个事件会在11:48:09.988到达(例如),正好在一个窗口结束和另一个窗口开始之前。从它创建的一些元组会落在一个窗口中,而其余的会落在下一个窗口中。
Flink的DataStream窗口API并不适合这个用例。通过在API堆栈中向上或向下来解决这个问题会更容易。
如果您上一级到Table API,则可以使用over windows,它允许在相对于特定事件的时间间隔内进行分组。
如果您向下一级并使用KeyedProcessFunction,则可以直接使用状态和时间。例如,您可以像现在这样对流进行键控,然后在ListState中收集在10秒间隔内到达的所有事件。

k5ifujac

k5ifujac2#

在窗口()之后尝试.允许延迟(时间.秒(2))

编辑:见下面大卫的注解。

相关问题