我有一个简单的flink应用程序,它总结了最后一分钟内具有相同id和时间戳的事件:
DataStream<String> input = env
.addSource(consumerProps)
.uid("app");
DataStream<Pixel> pixels = input.map(record -> mapper.readValue(record, Pixel.class));
pixels
.keyBy("id", "timestampRoundedToMinutes")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(dynamoDBSink);
env.execute(jobName);
我试图用文档中推荐的方法测试这个应用程序。我也看了这个stackoverflow问题,但是添加Flume没有帮助。
我确实有一个@classrule,就像我在测试课上推荐的那样。函数如下所示:
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
CollectSink.values.clear();
Pixel testPixel1 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel2 = Pixel.builder().id(2).timestampRoundedToMinutes("202002261220").constant(1).build();
Pixel testPixel3 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel4 = Pixel.builder().id(3).timestampRoundedToMinutes("202002261220").constant(1).build();
env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
.keyBy("id","timestampRoundedToMinutes")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(new CollectSink());
JobExecutionResult result = env.execute("AggregationTest");
assertNotEquals(0, CollectSink.values.size());
collectsink是从文档中复制的。
我做错什么了?有没有一种简单的方法来测试嵌入kafka的应用程序?
谢谢!
1条答案
按热度按时间sdnqo3pr1#
测试失败的原因是从未触发窗口。作业在窗口到达其分配的时间结束之前运行到完成。
原因与你处理时间的方式有关。通过指定
您正在安排相同id和时间戳在同一分钟内的所有事件位于同一窗口中。但是,因为您使用的是处理时间窗口(而不是事件时间窗口),所以直到测试运行的时间从一分钟跨越边界到下一分钟,您的窗口才会关闭。由于只有四个事件要处理,您的作业不太可能运行足够长的时间而发生这种情况。
您应该做的事情更像这样:将时间特性设置为事件时间,并提供时间戳提取器和水印赋值器。请注意,这样做,就不需要按时间戳(四舍五入到分钟边界)进行键控——这是事件时间窗口所做的工作的一部分。
有关事件时间、水印和窗口的详细信息,请参阅文档和教程。