我们的apachestorm拓扑使用kafkaspout监听来自kafka的消息,在做了大量的Map/减少/丰富/聚合等之后,etc最终将数据插入cassandra。还有另一个kafka输入,在这里我们接收用户对数据的查询,如果topology找到一个响应,那么它会将该响应发送到第三个kafka主题。现在我们想用junit编写e2e测试,在这个测试中我们可以直接以编程的方式将数据插入到拓扑中,然后通过插入用户查询消息,我们可以在第三点Assert在查询中收到的响应是正确的。
为了实现这一点,我们考虑启动嵌入式kafka和cassandraunit,然后用它们替换实际的kafka和cassandra,然后我们可以在这个junit测试的上下文中启动拓扑。
在开始实际测试之前,我们创建拓扑并将其提交到localcluster中。它在一个不同的线程上启动拓扑,然后从前面出来并开始执行我们的测试。到那时,拓扑还没有准备好,因为它需要一些时间来准备处理。有没有javaapi可以告诉我们拓扑何时准备好处理(意味着准备好从spout读取第一条消息)?
1条答案
按热度按时间nkcskrwz1#
这取决于你所说的“准备处理”是什么意思。
如果为localcluster启用时间模拟,则可以使用
Time.advanceClusterTime
分步前进。如果在提交拓扑后调用此方法,则仅当集群大部分处于空闲状态时,它才会返回。参见示例。https://github.com/apache/storm/blob/8f49e06998abb4dfc50f51d78b6784ebd04844fb/storm-core/test/jvm/org/apache/storm/integration/topologyintegrationtest.java#l233.如果您愿意用短棒(例如固定双管嘴)替换您的管嘴,您可以使用
Testing.completeTopology
等待拓扑处理完设置存根要发出的所有元组。等待拓扑处理一些元组的另一种方法是,将一些消息放入kafka中,启动拓扑,然后让测试线程轮询cassandra,看看期望的消息是否已经通过。这样,您就可以在测试线程中设置一个超时,如果在几秒钟内没有满足条件,测试就会失败。你可以使用一个实用程序,如等待https://github.com/awaitility/awaitility,或者只编写自己的轮询逻辑。
如果您所说的“准备处理”是指其他内容,请更详细地描述您的意思。