我正在尝试设置一个storm集群,它从kafka总线获取数据,然后对其进行处理。到目前为止,我只包含了一个“printerbolt”,它应该只输出消息。我已经试着在这个存储库中运行示例拓扑,并完成了这些工作。基于这些示例,我编写了以下非常基本的拓扑:
package storm.starter;
import java.util.Arrays;
import java.util.UUID;
import backtype.storm.Config;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.StormSubmitter;
import storm.kafka.ZkHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaSpout;
import storm.starter.bolt.PrinterBolt;
public class KafkaTest {
private static final String SPOUT_ID = "kafka-spout";
private static final String TOPOLOGY_NAME = "kafka-topology-tester";
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpout kafkaSpout = buildKafkaSpout();
builder.setSpout("kafka_test", kafkaSpout);
builder.setBolt("print", new PrinterBolt())
.shuffleGrouping("kafka_test");
Config conf = new Config();
StormSubmitter.submitTopology("kafka_test_top", conf, builder.createTopology());
}
private static KafkaSpout buildKafkaSpout(){
String zkHostPort = "192.168.50.11.2181";
String topic = "test";
String zkRoot = "/brokers";
String zkSpoutId = UUID.randomUUID().toString();
ZkHosts zkHosts = new ZkHosts(zkHostPort);
SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId);
KafkaSpout spout = new KafkaSpout(spoutCfg);
return spout;
}
}
在检查工作日志时,我发现有问题:
2015-05-11T09:30:07.688+0000 b.s.d.executor [INFO] Finished loading executor __acker:[1 1]
2015-05-11T09:30:07.742+0000 b.s.d.worker [INFO] Worker has topology config {"storm.id" "kafka_test_top-5-1431336572", "dev.zookeeper.path" "/tmp/dev-sto$
2015-05-11T09:30:07.742+0000 b.s.d.worker [INFO] Worker 3f75535d-ab2d-4ed9-b5a3-312192d6416d for storm kafka_test_top-5-1431336572 on acedcb7d-758a-49a6-$
2015-05-11T09:30:07.836+0000 b.s.d.worker [INFO] All connections are ready for worker acedcb7d-758a-49a6-a128-6864c696c297:6701 with id 3f75535d-ab2d-4ed$
2015-05-11T09:30:07.953+0000 b.s.d.executor [INFO] Preparing bolt print:(3)
2015-05-11T09:30:07.962+0000 b.s.d.executor [INFO] Prepared bolt print:(3)
2015-05-11T09:30:08.005+0000 b.s.d.executor [INFO] Preparing bolt __acker:(1)
2015-05-11T09:30:08.007+0000 b.s.d.executor [INFO] Preparing bolt __system:(-1)
2015-05-11T09:30:08.009+0000 b.s.d.executor [INFO] Prepared bolt __acker:(1)
2015-05-11T09:30:08.011+0000 b.s.d.executor [INFO] Opening spout kafka_test:(2)
2015-05-11T09:30:08.022+0000 b.s.d.executor [INFO] Prepared bolt __system:(-1)
2015-05-11T09:30:08.088+0000 b.s.util [ERROR] Async loop died!
java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher
at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:122) ~[stormjar.jar:na]
at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:91) ~[stormjar.jar:na]
at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[stormjar.jar:na]
at storm.kafka.ZkState.<init>(ZkState.java:61) ~[stormjar.jar:na]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__4654$fn__4669.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.Watcher
at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_79]
at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_79]
at java.security.AccessController.doPrivileged(Native Method) ~[na:1.7.0_79]
at java.net.URLClassLoader.findClass(URLClassLoader.java:354) ~[na:1.7.0_79]
at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_79]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) ~[na:1.7.0_79]
at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_79]
... 9 common frames omitted
2015-05-11T09:30:08.089+0000 b.s.d.executor [ERROR]
java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher
at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:122) ~[stormjar.jar:na]
at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:91) ~[stormjar.jar:na]
at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[stormjar.jar:na]
at storm.kafka.ZkState.<init>(ZkState.java:61) ~[stormjar.jar:na]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__4654$fn__4669.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.Watcher
at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_79]
at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_79]
at java.security.AccessController.doPrivileged(Native Method) ~[na:1.7.0_79]
at java.net.URLClassLoader.findClass(URLClassLoader.java:354) ~[na:1.7.0_79]
at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_79]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) ~[na:1.7.0_79]
at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_79]
... 9 common frames omitted
2015-05-11T09:30:08.173+0000 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__5102$fn__5103.invoke(worker.clj:495) [storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_executor_data$fn__4555$fn__4556.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
2015-05-11T09:30:08.200+0000 b.s.d.worker [INFO] Shutting down worker kafka_test_top-5-1431336572 acedcb7d-758a-49a6-a128-6864c696c297 6701
2015-05-11T09:30:08.205+0000 b.s.d.worker [INFO] Shutting down receive thread
2015-05-11T09:30:08.219+0000 b.s.m.n.Client [INFO] creating Netty Client, connecting to localhost:6701, bufferSize: 5242880
2015-05-11T09:30:08.219+0000 o.a.s.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (300). Pinning to 29
2015-05-11T09:30:08.219+0000 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300]
2015-05-11T09:30:08.226+0000 b.s.m.n.Client [INFO] connection attempt 1 to Netty-Client-localhost/127.0.0.1:6701 scheduled to run in 0 ms
2015-05-11T09:30:08.297+0000 b.s.m.loader [INFO] Shutting down receiving-thread: [kafka_test_top-5-1431336572, 6701]
2015-05-11T09:30:08.298+0000 b.s.m.n.Client [ERROR] connection to Netty-Client-localhost/127.0.0.1:6701 is unavailable
2015-05-11T09:30:08.298+0000 b.s.m.n.Client [ERROR] dropping 1 message(s) destined for Netty-Client-localhost/127.0.0.1:6701
2015-05-11T09:30:08.298+0000 b.s.m.n.Client [INFO] closing Netty Client Netty-Client-localhost/127.0.0.1:6701
2015-05-11T09:30:08.298+0000 b.s.m.n.Client [INFO] waiting up to 600000 ms to send 0 pending messages to Netty-Client-localhost/127.0.0.1:6701
2015-05-11T09:30:08.298+0000 b.s.m.loader [INFO] Waiting for receiving-thread:[kafka_test_top-5-1431336572, 6701] to die
2015-05-11T09:30:08.307+0000 b.s.m.loader [INFO] Shutdown receiving-thread: [kafka_test_top-5-1431336572, 6701]
2015-05-11T09:30:08.307+0000 b.s.d.worker [INFO] Shut down receive thread
2015-05-11T09:30:08.307+0000 b.s.d.worker [INFO] Terminating messaging context
2015-05-11T09:30:08.307+0000 b.s.d.worker [INFO] Shutting down executors
2015-05-11T09:30:08.314+0000 b.s.m.n.Client [INFO] connection established to Netty-Client-localhost/127.0.0.1:6701
2015-05-11T09:30:08.315+0000 b.s.d.executor [INFO] Shutting down executor kafka_test:[2 2]
2015-05-11T09:30:08.318+0000 b.s.util [INFO] Async loop interrupted!
一旦我开始通过Kafka总线发送消息,这些错误就会发生。当没有消息到达时,就没有问题,而且拓扑结构似乎正在等待输入。我在storm集群中运行两个节点:1个主节点,运行zookeeper(用于storm)和nimbus;和一个工作节点,运行storm supervisor。
3条答案
按热度按时间q9rjltbz1#
试试这个:将zookeeper-.jar放到path/apache-storm-1.0.2/extlib
e3bfsja22#
这些问题是由于该主题没有在Kafka代理上示例化造成的。
zaq34kh63#
你的
String zkHostPort = "192.168.50.11.2181";
配置错误。应该是这样的
String zkHostPort = "192.168.50.11:2181";