有人知道如何测试窗口功能吗 Flink
? 我正在使用依赖关系 flink-test-utils_2.11
.
我的步骤是:
去拿那个 StreamExecutionEnvironment
创建对象并添加到环境中
做一个 keyBy
添加会话窗口
执行聚合函数
public class AggregateVariantCEVTest extends AbstractTestBase {
@Test
public void testAggregateVariantCev() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromElements(objectOne, objectTwo)
.keyBy(new KeyedByMyCustomKey())
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.aggregate(new MyAgreggateFunction());
JobExecutionResult result = env.execute();
assertEquals(myExpectedResults, result.getAllAccumulatorResults());
}
}
问题是 result.getAllAccumulatorResults()
大小为0。
你知道我做错了什么吗?提前谢谢!
2条答案
按热度按时间gwbalxhn1#
windows不会将结果放入累加器。您应该在作业中附加一个测试接收器,然后将该接收器的内容与您期望的内容进行比较。类似于集成测试一节中文档中显示的内容。
wfsdck302#
可能正确的方法是使用
TestHarness
. 一个很好的例子是WindowOperatorTest
在flink项目本身。此外,您还可以结帐https://github.com/knaufk/flink-testing-pyramid 例如如何在测试金字塔的不同层次上测试flink作业,以及关于测试的flink文档https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html.