我如何确保Kafka主题已被删除?

yjghlzjz  于 2022-09-21  发布在  Apache
关注(0)|答案(2)|浏览(149)

遗憾的是,Kafka的Admin.deleteTopics API只有在收到请求后才返回--这只意味着主题被集群计划删除,但不一定现在删除。

为了说明这一点,下面的代码通常会抛出:

final var newTopic = new NewTopic("aaa", Optional.empty(), Optional.empty());
this.admin.createTopics(Collections.singleton(newTopic), opt).all().get();
this.admin.deleteTopics(Arrays.asList("aaa")).all().get();
this.admin.listTopics( ).names().get().contains("aaa"); // Returns 'false'.
this.admin.createTopics(Collections.singleton(newTopic), opt).all().get(); // <- throws

但有一个例外:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'aaa' is marked for deletion.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
...
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'aaa' is marked for deletion.

不幸的是,Admin.listTopics()在这里没有帮助,主题在提交删除请求后不再可见。

所以问题是--有没有什么编程方法(最好是API)可以让我们监控这个主题是否真的消失了?

使用的Kafka(客户端和服务器)版本为3.2。

5gfr0r5j

5gfr0r5j1#

到目前为止,我还没有在API中发现任何有趣的东西,但我有一些可能的想法,不幸的是它们都很难看。

1.每个主题似乎都有多个指标--与其大小相关的Bean,例如kafka.log:type=Log,name=LogStartOffset,topic=aaa,partition=0。这样我们就可以等到它们消失。然而,为了完全确定,我们可能需要检查所有代理(或至少:所有托管副本的代理)。
1.Broker示例似乎有一个名为kafka.controller:type=KafkaController,name=TopicsToDeleteCount的Bean,这可能是我们想要的,但没有细粒度控制。而且,我们可能仍然需要检查集群中的所有代理,并等待它们全部达到0。
1.看看ZooKeeper/Kraft的内部结构。有趣的部分可能出现在/brokers/topics/admin/delete_topics等路径上,但这意味着带来另一个(ZooKeeper)依赖项。卡夫也是一个类似的案例,我们会深入挖掘经纪商的内部信息。

y53ybaqx

y53ybaqx2#

在删除主题后创建foo主题,这可能会加快Kafka的删除速度。然后创建有用的主题。我的测试发现有一定的效果。

public void reCreateTopics(Set<String> topics) {
    adminClient.deleteTopics(topics).all().get();
    createFooTopic4TriggeringDelete();

    // may be sleep 1s before creating
    TimeUnit.SECONDS.sleep(1);

    adminClient.createTopics(topics).all().get();
}

private void createFooTopic4TriggeringDelete()  {
    NewTopic foo = new NewTopic("foo_" + System.currentTimeMillis(), 1, (short) 1);
    adminClient.createTopics(Collections.singleton(foo)).all().get();
    adminClient.deleteTopics(Collections.singleton(foo.name())).all().get();
}

相关问题