java—为什么在创建kafka集群未运行的主题时adminclient没有失败?

vatpfxk5  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(669)

为什么当我运行下面的生产者代码而kafka甚至没有运行时,我没有收到一条错误消息?
我希望 createTopics 方法引发异常,但它不会发生。为什么?

final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("events", 1, (short) 1);
adminClient.createTopics(Arrays.asList(newTopic));
adminClient.close();
q35jwt9p

q35jwt9p1#

createtopics方法返回 CreateTopicsResultKafkaFutures 作为价值观。因为您当前没有阻止完成此操作的代码(使用 get )而且不捕捉任何异常,您的代码只会正常运行,而不会通知代理不可用。
下面的代码将抛出 ExecutionException 当您的经纪人不在时:

final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("events-test", 1, (short) 1);
CreateTopicsResult topicResult = adminClient.createTopics(Arrays.asList(newTopic));
KafkaFuture<Void> resultFuture = topicResult.all();
try {
    resultFuture.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();

我使用kafka客户端2.5.0进行了测试,下面是示例:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1601136227182) timed out at 1601136227183 after 1 attempt(s)
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at org.michael.big.data.kafka.java.BasicAdminClient.main(BasicAdminClient.java:27)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1601136227182) timed out at 1601136227183 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

要注意班级 Admin (上流社会) AdminClient )注解为“正在发展”,在将来的版本中可能会更改。

相关问题