java—如何使用kafka 1.1.0以编程方式创建主题

k7fdbhmy  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(452)

我最近升级到了Kafka1.1.0。我正在尝试为kafka消费者创建单元测试。为此,如果单元测试可以创建它用于测试的主题,那将是理想的。我找到了一些代码,看起来它应该做我想要的。但是,当我运行它时,它抛出一个异常:java.lang.nosuchmethoderror:org.apache.kafka.common.utils.utils.closequietly(ljava/lang/autocloseable;ljava/lang/string;)v
下面是我在第行找到的创建主题的代码:

@BeforeClass
public static void createTopic() {
   try (final AdminClient adminClient = AdminClient.create(configure())) {
        try {
            // Define topic
            NewTopic newTopic = new NewTopic("test-orders", 1, (short)1);

            // Create topic, which is async call.
            final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

            // Since the call is Async, Lets wait for it to complete.
            createTopicsResult.values().get(ordersTopic).get();
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    }
}

但是,当我运行它时,它会抛出一个异常。

java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly(Ljava/lang/AutoCloseable;Ljava/lang/String;)V
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:334)
at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52)
at com.sial.notifications.topics.OrdersTopicsTests.createTopic(OrdersTopicsTests.java:162)

我传递给它的唯一配置参数是引导服务器和client.id。我做错什么了?看起来很简单

tf7tbtn2

tf7tbtn21#

一个简单得多的方法是只配置kafka来自动创建任何您使用的主题,而这些主题并不存在于 auto.create.topics.enable Kafka的背景。这样做,就不需要额外的代码来创建主题。你只需使用你想要的任何主题名称,Kafka会在你使用它时为你创建它。

9q78igpj

9q78igpj2#

当我在1.1.0代理上独立运行时,这段稍微修改的代码对我很有用:

public static void main(String[] args) {
    final String ordersTopic = "test-orders";
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    try (final AdminClient adminClient = AdminClient.create(props)) {
        try {
            // Define topic
            NewTopic newTopic = new NewTopic(ordersTopic, 1, (short)1);

            // Create topic, which is async call.
            final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

            // Since the call is Async, Lets wait for it to complete.
            createTopicsResult.values().get(ordersTopic).get();
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException))
                throw new RuntimeException(e.getMessage(), e);
        }
    }
}

由于这与您的代码非常相似,并且基于您看到的错误,您可能还没有完全整理出对kafka库的依赖关系?我用了maven神器 org.apache.kafka:kafka_2.12:1.1.0 .

相关问题