通过java kafkaserver api删除和重新创建kafka主题

mwngjboj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(300)

我的应用程序有一些集成测试,它们连接到本地kafka示例。我正在使用java kafkaserver api按需创建本地示例,当测试以类似于此问题的公认答案的方式运行时:
如何为junit测试示例化模拟kafka主题?
我的每个测试在单独运行时都会通过。我遇到的问题是,我的测试使用相同的Kafka主题,我希望主题开始每个测试不包含任何消息。但是,当我在系列中运行测试时,当所有测试在第一次运行后都尝试重新创建它们所需的主题时,会出现此错误:

kafka.common.TopicExistsException: Topic "test_topic" already exists.
    at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:187)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
    at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93)

每个测试创建并关闭自己的embeddedzookeeper和kafkaserver。我还尝试从zk中删除“brokers/topics”路径,以及在每个测试结束时删除kafkaserver的logdir。不知何故,第一次考试的题目一直延续到第二次考试。
在每个测试结束时,我可以做些什么来确保它使用的主题不会干扰在它之后运行的测试?

taor4pac

taor4pac1#

我终于能让它工作了。
我没有在每次测试后进行清理,而是将测试更改为在运行前进行清理。
我需要做两个清理步骤。
首先是在启动kafkaserver之前删除代理的数据目录。

String dataDirectory = 'tmp/kafka'
    FileUtils.deleteDirectory(FileUtils.getFile(dataDirectory))

    Properties props = TestUtils.createBrokerConfig(BROKER_ID, port, true)
    props.put('log.dir', dataDirectory)
    props.put('delete.topic.enable', 'true')

    KafkaConfig config = new KafkaConfig(props)
    Time mock = new MockTime()
    kafkaServer = TestUtils.createServer(config, mock)

第二种方法是在发送createtopic命令之前,在zookeeper中递归删除主题路径。

zkClient.deleteRecursive(ZkUtils.getTopicPath(topicName))

    List<String> arguments = ['--topic', topicName, '--partitions', '1', '--replication-factor', '1']
    TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments as String[]))

我尝试了许多类似的方法,但除了这个以外,其他方法都无法使用。
注意,代码是groovy而不是java。

相关问题