Flink集成测试:输入源的顺序依赖性

xdnvmnnf  于 12个月前  发布在  Apache
关注(0)|答案(2)|浏览(136)

我想要的是

我想在flink中实现一个窗口函数的自动化测试,以确保累加器和窗口处理函数一起创建预期的聚合。

我所做的一切

我根据Flink integration test sample实现了一个flink集成测试。我示例化了完整的flink应用程序,注入了硬编码的源代码和一个用于接收器的spy,窗口函数的结果将传播到该接收器。乍一看,这很好用。

What's the issue

这个测试是不可靠的。有时候有用有时候没用。

假设

我们认为这可能是测试期间处理输入源的时间问题。
有两个输入流:zones(具有几何形状的区域)和events(具有坐标)。作为flink应用程序的一部分,区域被广播并与事件(空间上)连接,因此下游,所有事件都被分配到它们发生的区域(使用BroadcastProcessFunction)。在至少一个区域的几何结构内不发生的事件被丢弃,并且与下游处理(诸如待测试的窗口操作)不相关。
在测试中,我们需要确保zone流在事件流之前被处理。否则,分配给区域的事件可能不会被创建,并且因此不会被聚合。

解决方案示意图

在考虑这个问题时,我们想到了三种不同的方法来处理时间问题:

缩小集成测试范围

我们可以在集成测试期间设置一个简化的版本,而不是示例化完整的应用程序,该版本从已经具有分配给区域的事件的输入源开始。没有多个输入源=>没有时序问题。然而,这会导致我们实际上想要测试的代码重复。此外,最好有一个完整的端到端集成测试,因为在某些情况下,多个运营商联合提供价值。

让一个源等待另一个源

我们尝试了zone源代码在完成时表示,并让event源代码等待。但是CompletableFuture的使用是不可能的,因为它(本质上)是不可序列化的。我看过Flink - Integration testing with multiple sources,但我希望有一个更flink内置的替代方案(解决方案中有很多样板代码)。

使用保存点启动

我们首先处理zone流(使用空的event源),然后获取保存点(在测试的@Before中),然后从@BeforeEach测试中恢复。但据我所知,我只能从保存点恢复完整的图,而从它恢复时,我想交换event源代码的源提供一个专门的事件列表。
救命!
如果能获得经验丰富的Flink开发人员对如何在集成测试中处理多个(依赖)源的见解,那就太好了。

nvbavucw

nvbavucw1#

您所描述的问题不仅仅存在于集成测试中。设想在生产中,区域数据的源被延迟(例如,运维团队在Kafka集群上进行维护)-您不想丢弃一堆初始事件,对吧?
处理这种情况的一种方法是在状态中缓冲任何要被丢弃的候选事件,使用状态TTL或计时器来等待您愿意等待的最长时间。
然后,在集成测试中,您将TTL设置为某个值,该值在“足够长以始终工作”和“不要导致测试运行太长时间”之间取得平衡。不理想,但可行。
或者你可以做你提到的,你做一个“冷启动”的应用程序,与一个空的事件源,采取一个保存点,并在正常模式下从你的保存点重新启动。这是我经常烘焙到生产应用程序中的功能,因为它消除了缓冲的需要。
但它确实使测试代码变得更加复杂,因为您必须异步启动作业,等待直到某个指示器(通常是计数器)告诉您所有广播数据都已处理,获取保存点,停止作业,然后再次启动它。

x759pob2

x759pob22#

因为我不喜欢仅仅为了可测试性而调整管道的想法,所以我遵循了@till-rohrmanns solution sketch
我们不是在测试中创建管道,而是示例化完整的Flink管道,注入模拟,然后可以Assert。
我记录了整个方法in a Gist on Github
尽管如此,如果有这样做的本地方法(或来自flink-test-utils的助手,我还不知道),我会很乐意从有经验的开发人员那里阅读!

相关问题