flink:窗口不处理流末尾的数据

pgx2nnw8  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(892)

我有一个流(Kafka的msgs流到一个主题)与FlinkKafka消费者,我注意到一个有趣的行为,我期待解决。
当数据流传入时,如果它在窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则不会触发管道的其余部分。
流程示例:

env.addSource(kafkaConsumer)
       .flatMap(new TokenMapper())
       .keyBy("word")
       .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
       .reduce(new CountTokens())
       .flatMap(new ConvertToString())
       .addSink(producer);

我使用的是flinkkafkaconsumer010,其env timecharacteristic设置为eventtime。和consumer.assigntimestampsandwatermarks(new periodicwatermarks())

private static class PeriodicWatermarks implements   AssignerWithPeriodicWatermarks<String>{

    private long currentMaxTimestamp;
    private final long maxOutOfOrderness;

    public PeriodicWatermarksAuto(long maxOutOfOrderness){
        this.maxOutOfOrderness = maxOutOfOrderness;
    }

    @Override
    public Watermark getCurrentWatermark() {
         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(String t, long l) {
        // this should be the event timestamp
        currentMaxTimestamp = l;
        logger.info("TIMESTAMP: " + l);
        return l;
    }
}

如果我的窗口是10秒,并且我的数据流只包含8秒的数据(然后在一段时间内停止流式处理),那么flatmap->sink将不处理,直到新的稍后的数据流进来。
示例数据流处理问题:(每个x是每秒一个数据块)

xxxxxxxx(8secs)------(gap)--(later more data)xxxxx
      ^(not processed)           (until I get here)^

类似地,如果我有35秒的流数据(同样我的窗口是10秒),只有3个窗口的数据触发,剩下的5秒的数据从不处理。

...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx
         (processed)        ^(not processed)          (until I get here)^

最后,如果我的窗口是10秒,而我只有5秒的流数据,那么flatmap->sink永远不会发生。
我的问题是,如果一段时间后我们看不到数据,有没有办法触发窗口化的数据进行处理?
如果我的数据是实时传输的,我可以看到没有数据的延伸,并且不希望最后一个窗口(假设只有5秒的数据)必须等待一些不确定的时间,直到新的数据进来,我希望在窗口时间过去后,最后一个窗口的结果。
大声想想,这似乎是由于使用eventtime而不是processingtime,或者,我的水印没有正确生成,最后一个窗口实际上触发。。。不确定也许两者都有一点?我想这对任何人来说都是个问题,如果流结束时最后一位没有触发。我想说,我可能会发送一个流结束味精,但这并没有帮助,如果蒸汽结束,因为来源打破了流。
编辑:所以我改为处理时间,它确实正确地处理了最后一个窗口中的数据,所以我猜事件时间是罪魁祸首,我想一个自定义触发器或适当的窗口水印可能是答案。。。
谢谢你的帮助!

e4yzc0pl

e4yzc0pl1#

我将把这个留给后人,因为这个问题正如我所想,与水印有关。时间戳和watermaker(来自assigntimestampsandwatermarks)调用'getCurrentWatermarks()',因为我正在将基于传入实体的水印设置为一个固定的数字(它们的时间戳-最大偏移量),所以它在看到新的实体之前不会更新。
我的解决方案是某种计时器,如果在可配置的时间内没有看到数据,则最终将水印提前到下一个窗口。我将无法处理非常潜在的数据,但我不认为这应该是一个问题。这是eventtime处理的预期行为。

相关问题