kafka:无法创建嵌入式kafka服务器

fgw7neuy  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(489)

更新:
下面是我试图遵循的示例代码。
https://gist.github.com/asmaier/6465468
它应该适用于Kafka0.10.x
我正在尝试创建一个嵌入式Kafka服务器作为

EmbeddedZookeeper zkServer = new EmbeddedZookeeper();
        String zkConnect = ZKHOST + ":" + zkServer.port();
        ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);

        // setup Broker
        Properties brokerProps = new Properties();
        brokerProps.setProperty("zookeeper.connect", zkConnect);
        brokerProps.setProperty("broker.id", "0");
        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
        brokerProps.setProperty("listeners", "PLAINTEXT://:" + BROKERPORT);
        brokerProps.setProperty("advertised.listeners", "PLAINTEXT://" + BROKERHOST +":" + BROKERPORT);
        //brokerProps.setProperty("security.inter.broker.protocol","PLAINTEXT");
        KafkaConfig config = new KafkaConfig(brokerProps);
        Time mock = new MockTime();
        KafkaServer kafkaServer = TestUtils.createServer(config, mock);
        logger.info("TestKafkaServer created");

但我明白了

java.lang.NoSuchFieldError: DEFAULT_SASL_ENABLED_MECHANISMS
    at kafka.server.Defaults$.<init>(KafkaConfig.scala:183)
    at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
    at kafka.log.Defaults$.<init>(LogConfig.scala:35)
    at kafka.log.Defaults$.<clinit>(LogConfig.scala)
    at kafka.log.LogConfig$.<init>(LogConfig.scala:246)
    at kafka.log.LogConfig$.<clinit>(LogConfig.scala)
    at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:270)
    at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:795)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:797)
    at com.ibm.whi.bap.helper.test.kafka.KafkaServerTest.<init>(KafkaServerTest.java:56)
    at com.ibm.whi.bap.helper.test.kafka.KafkaTest.checkAllProperties(KafkaTest.java:115)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
    at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48)
    at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

java.lang.NullPointerException
    at com.ibm.whi.bap.helper.test.kafka.KafkaTest.tearDown(KafkaTest.java:58)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:37)
    at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48)
    at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

KafkaConfig config = new KafkaConfig(brokerProps);

我做错什么了?我试着和你玩

brokerProps.setProperty("security.inter.broker.protocol","PLAINTEXT");

但什么都没用。我只想为单元测试创建一个嵌入式kafka服务器。我根本不想设置任何安全措施。
这是我的Maven

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>
lokaqttq

lokaqttq1#

这不是代码问题。这是版本不匹配。您正在使用的库(https://gist.github.com/asmaier/6465468)是根据Kafka0.10.0.0编译的,这与您正在使用的Kafka版本()不同。这就是为什么它无法找到字段default\u sasl\u enabled\u mechanisms。你可能想看看下面links:-
盗用Zookeeper
Kafka
您只需将这些类复制到您的项目源代码中,或者您可以修改一些细节(比如删除schema registry)。这将比依赖于某个库(正如您当前所依赖的)更简单https://gist.github.com/asmaier/6465468 ),只是为了创建内存中的kafka集群。毕竟这只是几行代码。希望这有帮助。

相关问题