有没有可能编写一个使用flink statefun线束终止的单元测试?

6jygbczu  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(474)

我在用flink状态函数做一个新项目。我已经使用functiontestharness编写了一些基本的单元测试,但是使用这种方法的测试无法测试有状态函数之间的交互。
flink测试文档(基本flink,不适用于有状态函数)演示了如何使用 MiniClusterWithClientResource 然后对作业的输出进行Assert。我正在寻找一种方法,用有状态函数做类似的事情。
statefun flink harness的例子看起来很有前途,但是 RunnerTest 使用线束标有 @Ignore 因为它永远不会终止。这对调试很有用,但不能用于自动测试。
到目前为止,我发现了一个问题,它使得编写一个以线束终止的测试变得困难:
harness使用serializablesupplier提供输入,serializablesupplier没有办法说它完成了。这意味着任何使用线束的测试总是在等待更多的输入。
如果线束知道所有输入都已发送,则需要一种在没有挂起事件时终止的方法。
另外,由于上下文发送的延迟事件,一些系统仍然不会终止 .sendAfter() 我认为这将是一种常见的需求,以支持可以从ci/cd进程运行的更有趣的自动化测试。有没有人找到了解决上述问题的方法,或者发现了一种完全不同的方法,使用的工具不是线束?

agxfikkp

agxfikkp1#

线束还包含一个 .withFlinkSourceFunction() 允许使用任何flink的方法 SourceFunction 作为入口。
您可以创建自己的源函数来生成有限的元素集合,例如:

class FiniteSource<T extends Serializable> implements SourceFunction<T> {
    private final List<T> items;

    FiniteSource(List<T> items) {
      this.items = items;
    }

    @Override
    public void run(SourceContext<T> sourceContext) {
      for (T item : items) {
        sourceContext.collect(item);
      }
    }

    @Override
    public void cancel() {}
  }

然后,可以按以下方式修改线束示例:

FiniteSource<MyInputMessage> finiteSource = new FiniteSource<>(
            Arrays.asList(
                    new MyInputMessage("user-1", "hello"),
                    new MyInputMessage("user-2", "world")));

    Harness harness =
        new Harness()
            .withKryoMessageSerializer()
            .withFlinkSourceFunction(MyConstants.REQUEST_INGRESS,finiteSource)
            .withPrintingEgress(MyConstants.RESULT_EGRESS);

    harness.start();

在一个入口中产生两条输入消息后,这应该终止。如果您认为这是一个常见的要求,那么我鼓励您在flink邮件列表中提出这一点,我相信那里的友好社区会很乐意接受您的反馈意见,更愿意做出贡献;)
有没有人找到了解决上述问题的方法,或者发现了一种完全不同的方法,使用的工具不是线束?
对于ci/cd管道,我建议检查基于测试容器的e2e测试(例如这个)

相关问题