测试flink窗口

pcrecxhr  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(669)

我有一个简单的flink应用程序,它总结了最后一分钟内具有相同id和时间戳的事件:

DataStream<String> input = env
                .addSource(consumerProps)
                .uid("app");

DataStream<Pixel> pixels = input.map(record -> mapper.readValue(record, Pixel.class));

pixels
        .keyBy("id", "timestampRoundedToMinutes")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(dynamoDBSink);

env.execute(jobName);

我试图用文档中推荐的方法测试这个应用程序。我也看了这个stackoverflow问题,但是添加Flume没有帮助。
我确实有一个@classrule,就像我在测试课上推荐的那样。函数如下所示:

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

CollectSink.values.clear();

Pixel testPixel1 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel2 = Pixel.builder().id(2).timestampRoundedToMinutes("202002261220").constant(1).build();
Pixel testPixel3 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel4 = Pixel.builder().id(3).timestampRoundedToMinutes("202002261220").constant(1).build();

env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
    .keyBy("id","timestampRoundedToMinutes")
    .timeWindow(Time.minutes(1))
    .sum("constant")
    .addSink(new CollectSink());

JobExecutionResult result = env.execute("AggregationTest");
assertNotEquals(0, CollectSink.values.size());

collectsink是从文档中复制的。
我做错什么了?有没有一种简单的方法来测试嵌入kafka的应用程序?
谢谢!

sdnqo3pr

sdnqo3pr1#

测试失败的原因是从未触发窗口。作业在窗口到达其分配的时间结束之前运行到完成。
原因与你处理时间的方式有关。通过指定

.keyBy("id","timestampRoundedToMinutes")

您正在安排相同id和时间戳在同一分钟内的所有事件位于同一窗口中。但是,因为您使用的是处理时间窗口(而不是事件时间窗口),所以直到测试运行的时间从一分钟跨越边界到下一分钟,您的窗口才会关闭。由于只有四个事件要处理,您的作业不太可能运行足够长的时间而发生这种情况。
您应该做的事情更像这样:将时间特性设置为事件时间,并提供时间戳提取器和水印赋值器。请注意,这样做,就不需要按时间戳(四舍五入到分钟边界)进行键控——这是事件时间窗口所做的工作的一部分。

public static void main(String[] args) throws Exception {
    ...

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
        .assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(new CollectSink());

    env.execute();
}

private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
    public TimestampsAndWatermarks() {
        super(/* delay to handle out-of-orderness */);
    }

    @Override
    public long extractTimestamp(Event event) {
        return event.timestamp;
    }
}

有关事件时间、水印和窗口的详细信息,请参阅文档和教程。

相关问题