更新:
下面是我试图遵循的示例代码。
https://gist.github.com/asmaier/6465468
它应该适用于Kafka0.10.x
我正在尝试创建一个嵌入式Kafka服务器作为
EmbeddedZookeeper zkServer = new EmbeddedZookeeper();
String zkConnect = ZKHOST + ":" + zkServer.port();
ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
// setup Broker
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
brokerProps.setProperty("listeners", "PLAINTEXT://:" + BROKERPORT);
brokerProps.setProperty("advertised.listeners", "PLAINTEXT://" + BROKERHOST +":" + BROKERPORT);
//brokerProps.setProperty("security.inter.broker.protocol","PLAINTEXT");
KafkaConfig config = new KafkaConfig(brokerProps);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);
logger.info("TestKafkaServer created");
但我明白了
java.lang.NoSuchFieldError: DEFAULT_SASL_ENABLED_MECHANISMS
at kafka.server.Defaults$.<init>(KafkaConfig.scala:183)
at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
at kafka.log.Defaults$.<init>(LogConfig.scala:35)
at kafka.log.Defaults$.<clinit>(LogConfig.scala)
at kafka.log.LogConfig$.<init>(LogConfig.scala:246)
at kafka.log.LogConfig$.<clinit>(LogConfig.scala)
at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:270)
at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:795)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:797)
at com.ibm.whi.bap.helper.test.kafka.KafkaServerTest.<init>(KafkaServerTest.java:56)
at com.ibm.whi.bap.helper.test.kafka.KafkaTest.checkAllProperties(KafkaTest.java:115)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48)
at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
java.lang.NullPointerException
at com.ibm.whi.bap.helper.test.kafka.KafkaTest.tearDown(KafkaTest.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:37)
at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48)
at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
在
KafkaConfig config = new KafkaConfig(brokerProps);
我做错什么了?我试着和你玩
brokerProps.setProperty("security.inter.broker.protocol","PLAINTEXT");
但什么都没用。我只想为单元测试创建一个嵌入式kafka服务器。我根本不想设置任何安全措施。
这是我的Maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
1条答案
按热度按时间lokaqttq1#
这不是代码问题。这是版本不匹配。您正在使用的库(https://gist.github.com/asmaier/6465468)是根据Kafka0.10.0.0编译的,这与您正在使用的Kafka版本()不同。这就是为什么它无法找到字段default\u sasl\u enabled\u mechanisms。你可能想看看下面links:-
盗用Zookeeper
Kafka
您只需将这些类复制到您的项目源代码中,或者您可以修改一些细节(比如删除schema registry)。这将比依赖于某个库(正如您当前所依赖的)更简单https://gist.github.com/asmaier/6465468 ),只是为了创建内存中的kafka集群。毕竟这只是几行代码。希望这有帮助。