clients-2.6.0的java-listtopics提供了timeoutexception

uinbv5nw  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(251)

我有一个小代码来检查Kafka中是否已经出现了一个特定的主题。它在kafka-clients-2.5.0上运行良好。但在将kafka客户机升级到2.6.0之后,它开始提供timeoutexception。
这是我的原始代码。

Properties adminProperties = new Properties();
   adminProperties.put(ProducerConfig."bootstrap.servers", "localhost:9092");
   AdminClient adminClient = KafkaAdminClient.create(adminProperties);
   boolean topicExists = adminClient.listTopics().names().get().contains("myDataTopic");

为了排除故障,我将其拆分并尝试扩展一些超时值,如下所示。但没用。它在2.5.1中可以正常工作,但在2.6.0中不能。

Properties adminProperties = new Properties();
        adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        adminProperties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "900000");
        AdminClient adminClient = KafkaAdminClient.create(adminProperties);

        System.out.println("createKafkaTopic(): Listing Topics...");
        ListTopicsResult listTopicsResult = adminClient.listTopics(new ListTopicsOptions().timeoutMs(900000));
        System.out.println("createKafkaTopic(): Retrieve Topic names...");
        KafkaFuture<Collection<TopicListing>> setKafkaFuture = listTopicsResult.listings();
        System.out.println("createKafkaTopic(): Display existing Topics...");
        while(!setKafkaFuture.isDone()) {
            System.out.println("Waiting...");
            Thread.sleep(10);
        }
        Collection<TopicListing> topicNames = setKafkaFuture.get(900,TimeUnit.SECONDS);
        System.out.println(topicNames);
        System.out.println("createKafkaTopic(): Check if Topic exists...");
        boolean topicExists = topicNames.contains("myDataTopic");

以下是我的输出:

createKafkaTopic(): Listing Topics...
createKafkaTopic(): Retrieve Topic names...
createKafkaTopic(): Display existing Topics...
Waiting...
Waiting...
Waiting...
Waiting...
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1604903438670, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
    at KafkaUtil.createKafkaTopic(KafkaUtil.java:45)
    at KafkaUtil.main(KafkaUtil.java:21)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1604903438670, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

我在这里看到了一个类似的问题(如何在java中使用kafka客户机显示主题?)。但它似乎是通过添加一些依赖项来解决的。我也尝试将所有依赖项添加到pom.xml中,但没有成功。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题