我的应用程序有一些集成测试,它们连接到本地kafka示例。我正在使用java kafkaserver api按需创建本地示例,当测试以类似于此问题的公认答案的方式运行时:
如何为junit测试示例化模拟kafka主题?
我的每个测试在单独运行时都会通过。我遇到的问题是,我的测试使用相同的Kafka主题,我希望主题开始每个测试不包含任何消息。但是,当我在系列中运行测试时,当所有测试在第一次运行后都尝试重新创建它们所需的主题时,会出现此错误:
kafka.common.TopicExistsException: Topic "test_topic" already exists.
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:187)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93)
每个测试创建并关闭自己的embeddedzookeeper和kafkaserver。我还尝试从zk中删除“brokers/topics”路径,以及在每个测试结束时删除kafkaserver的logdir。不知何故,第一次考试的题目一直延续到第二次考试。
在每个测试结束时,我可以做些什么来确保它使用的主题不会干扰在它之后运行的测试?
1条答案
按热度按时间taor4pac1#
我终于能让它工作了。
我没有在每次测试后进行清理,而是将测试更改为在运行前进行清理。
我需要做两个清理步骤。
首先是在启动kafkaserver之前删除代理的数据目录。
第二种方法是在发送createtopic命令之前,在zookeeper中递归删除主题路径。
我尝试了许多类似的方法,但除了这个以外,其他方法都无法使用。
注意,代码是groovy而不是java。