我想示例化一个localcluster(),防止它运行自己的嵌入式zookeeper,而使用我的。
关于这个问题:“https://issues.apache.org/jira/browse/storm-213“已为版本0.9.3解析。
能给我一个示例代码吗?
ps:我正在集成测试我的storm拓扑,我使用kafka和zookeeper作为storm的输入。当我没有将zookeeper信息指定给localcluster时,我在“localcluster localcluster=new localcluster()”行得到这个异常:
2016-06-08 12:16:56,785 WARN [Thread-30] jmx.MBeanRegistry (MBeanRegistry.java:register(100)) - Failed to register MBean StandaloneServer_port-1
2016-06-08 12:16:56,785 WARN [Thread-30] server.ZooKeeperServer (ZooKeeperServer.java:registerJMX(387)) - Failed to register with JMX
javax.management.InstanceAlreadyExistsException: org.apache.ZooKeeperService:name0=StandaloneServer_port-1
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.zookeeper.jmx.MBeanRegistry.register(MBeanRegistry.java:96)
at org.apache.zookeeper.server.ZooKeeperServer.registerJMX(ZooKeeperServer.java:377)
at org.apache.zookeeper.server.ZooKeeperServer.startup(ZooKeeperServer.java:410)
at org.apache.zookeeper.server.NIOServerCnxnFactory.startup(NIOServerCnxnFactory.java:123)
当我为本地集群指定“storm.zookeeper.servers”和“storm.zookeeper.port”时,我在“localcluster.submittopology()”行得到以下异常:
EndOfStreamException: Unable to read additional data from client sessionid 0x1552f0890b70000, likely client has closed socket
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:26)
at org.apache.storm.testing$submit_local_topology.invoke(testing.clj:301)
at org.apache.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:49)
at org.apache.storm.LocalCluster.submitTopology(Unknown Source)
2条答案
按热度按时间sqyvllje1#
你可以使用过载
LocalCluster
建造师。fnvucqvd2#
我能够使用kafka作为storm拓扑的输入,通过使用这个链接提供的配置http://storm.apache.org/releases/1.0.2/storm-kafka.html
我为storm配置创建了一个特定的java类
然后我在创建拓扑的初始喷口时使用了此类中的方法: