如何正确测试flink窗口函数?

f87krz0w  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(709)

有人知道如何测试窗口功能吗 Flink ? 我正在使用依赖关系 flink-test-utils_2.11 .
我的步骤是:
去拿那个 StreamExecutionEnvironment 创建对象并添加到环境中
做一个 keyBy 添加会话窗口
执行聚合函数

public class AggregateVariantCEVTest extends AbstractTestBase {

   @Test
    public void testAggregateVariantCev() throws Exception  {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);

       env.fromElements(objectOne, objectTwo)
               .keyBy(new KeyedByMyCustomKey())
               .window(EventTimeSessionWindows.withGap(Time.seconds(1)))
               .aggregate(new MyAgreggateFunction());

       JobExecutionResult result = env.execute();

       assertEquals(myExpectedResults, result.getAllAccumulatorResults());

   }
}

问题是 result.getAllAccumulatorResults() 大小为0。
你知道我做错了什么吗?提前谢谢!

gwbalxhn

gwbalxhn1#

windows不会将结果放入累加器。您应该在作业中附加一个测试接收器,然后将该接收器的内容与您期望的内容进行比较。类似于集成测试一节中文档中显示的内容。

wfsdck30

wfsdck302#

可能正确的方法是使用 TestHarness . 一个很好的例子是 WindowOperatorTest 在flink项目本身。
此外,您还可以结帐https://github.com/knaufk/flink-testing-pyramid 例如如何在测试金字塔的不同层次上测试flink作业,以及关于测试的flink文档https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html.

相关问题