我在用flink状态函数做一个新项目。我已经使用functiontestharness编写了一些基本的单元测试,但是使用这种方法的测试无法测试有状态函数之间的交互。
flink测试文档(基本flink,不适用于有状态函数)演示了如何使用 MiniClusterWithClientResource
然后对作业的输出进行Assert。我正在寻找一种方法,用有状态函数做类似的事情。
statefun flink harness的例子看起来很有前途,但是 RunnerTest
使用线束标有 @Ignore
因为它永远不会终止。这对调试很有用,但不能用于自动测试。
到目前为止,我发现了一个问题,它使得编写一个以线束终止的测试变得困难:
harness使用serializablesupplier提供输入,serializablesupplier没有办法说它完成了。这意味着任何使用线束的测试总是在等待更多的输入。
如果线束知道所有输入都已发送,则需要一种在没有挂起事件时终止的方法。
另外,由于上下文发送的延迟事件,一些系统仍然不会终止 .sendAfter()
我认为这将是一种常见的需求,以支持可以从ci/cd进程运行的更有趣的自动化测试。有没有人找到了解决上述问题的方法,或者发现了一种完全不同的方法,使用的工具不是线束?
1条答案
按热度按时间agxfikkp1#
线束还包含一个
.withFlinkSourceFunction()
允许使用任何flink的方法SourceFunction
作为入口。您可以创建自己的源函数来生成有限的元素集合,例如:
然后,可以按以下方式修改线束示例:
在一个入口中产生两条输入消息后,这应该终止。如果您认为这是一个常见的要求,那么我鼓励您在flink邮件列表中提出这一点,我相信那里的友好社区会很乐意接受您的反馈意见,更愿意做出贡献;)
有没有人找到了解决上述问题的方法,或者发现了一种完全不同的方法,使用的工具不是线束?
对于ci/cd管道,我建议检查基于测试容器的e2e测试(例如这个)