我正在尝试运行一个简单的项目,如下所示:
public static void main(String [] args){
TopologyBuilder builder = new TopologyBuilder();
String kafkaTopic = "videoMonitor"; //only this topic encounter this problem
String spoutID = "VQMSpoutID";
String zkRoot = "/VQMZkRoot";
BrokerHosts brokerHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts,kafkaTopic,zkRoot,spoutID);
spoutConfig.scheme = new SchemeAsMultiScheme(new MyScheme());
MyKafkaSpout myKafkaSpout = new MyKafkaSpout(spoutConfig);
builder.setSpout("my",myKafkaSpout);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("topoName",new Config(),builder.createTopology());
}
public class MyScheme implements Scheme {
@Override
public List<Object> deserialize(byte[] ser) {
System.out.println("------------------------------------------------------>" + ser);
ArrayList arrayList = new ArrayList();
arrayList.add("nice");
return arrayList;
}
@Override
public Fields getOutputFields() {
return new Fields("all");
}
}
public class MyKafkaSpout extends KafkaSpout{
public MyKafkaSpout(SpoutConfig spoutConf) {
super(spoutConf);
}
}
我试着用kafkaspout来读Kafka主题的信息。但是,项目会出现一些错误:
2016-08-16 15:00:12.565 [Thread-9-my] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka3:9092, 1=kafka4:9092, 2=kafka1:9092, 3=kafka2:9092, 4=kafka3:9092, 5=kafka4:9092, 6=kafka1:9092, 7=kafka2:9092}}
2016-08-16 15:00:12.566 [Thread-9-my] INFO storm.kafka.KafkaUtils - Task [1/1] assigned [Partition{host=kafka3:9092, partition=0}, Partition{host=kafka4:9092, partition=1}, Partition{host=kafka1:9092, partition=2}, Partition{host=kafka2:9092, partition=3}, Partition{host=kafka3:9092, partition=4}, Partition{host=kafka4:9092, partition=5}, Partition{host=kafka1:9092, partition=6}, Partition{host=kafka2:9092, partition=7}]
2016-08-16 15:00:12.567 [Thread-9-my] INFO storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
2016-08-16 15:00:12.567 [Thread-9-my] INFO storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=kafka3:9092, partition=4}, Partition{host=kafka1:9092, partition=6}, Partition{host=kafka4:9092, partition=5}, Partition{host=kafka2:9092, partition=7}, Partition{host=kafka3:9092, partition=0}, Partition{host=kafka1:9092, partition=2}, Partition{host=kafka4:9092, partition=1}, Partition{host=kafka2:9092, partition=3}]
2016-08-16 15:00:12.641 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000c type:exists cxid:0x1 zxid:0xfffffffffffffffe txntype:unknown reqpath:/VQMZkRoot/VQMSpoutID/partition_4
2016-08-16 15:00:12.641 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000c type:exists cxid:0x1 zxid:0xfffffffffffffffe txntype:unknown reqpath:/VQMZkRoot/VQMSpoutID/partition_4
2016-08-16 15:00:12.641 [Thread-9-my-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000c, packet:: clientPath:null serverPath:null finished:false header:: 1,3 replyHeader:: 1,33,-101 request:: '/VQMZkRoot/VQMSpoutID/partition_4,F response::
2016-08-16 15:00:12.642 [Thread-9-my] INFO storm.kafka.PartitionManager - Read partition information from: /VQMZkRoot/VQMSpoutID/partition_4 --> null
2016-08-16 15:00:12.669 [Thread-9-my] DEBUG kafka.consumer.SimpleConsumer - Disconnecting from kafka3:9092
2016-08-16 15:00:12.720 [Thread-9-my] DEBUG kafka.network.BlockingChannel - Created socket with SO_TIMEOUT = 10000 (requested 10000), SO_RCVBUF = 1048576 (requested 1048576), SO_SNDBUF = 65536 (requested -1), connectTimeoutMs = 10000.
2016-08-16 15:00:12.768 [Thread-9-my] INFO kafka.consumer.SimpleConsumer - Reconnect due to error:
java.io.IOException: 远程主机强迫关闭了一个现有的连接。(In english means: Remote host closed an existing connection)
at sun.nio.ch.SocketDispatcher.read0(Native Method) ~[na:1.8.0_91]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[na:1.8.0_91]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_91]
at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[na:1.8.0_91]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_91]
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:206) ~[na:1.8.0_91]
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[na:1.8.0_91]
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[na:1.8.0_91]
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[kafka-clients-0.10.0.0.jar:na]
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[kafka_2.11-0.9.0.1.jar:na]
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[kafka_2.11-0.9.0.1.jar:na]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) [kafka_2.11-0.9.0.1.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [kafka_2.11-0.9.0.1.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) [kafka_2.11-0.9.0.1.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) [kafka_2.11-0.9.0.1.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) [storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) [storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) [storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) [storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [storm-kafka-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$fn__3371$fn__3386$fn__3415.invoke(executor.clj:565) [storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) [storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2016-08-16 15:00:12.768 [Thread-9-my] DEBUG kafka.consumer.SimpleConsumer - Disconnecting from kafka3:9092
2016-08-16 15:00:12.770 [Thread-9-my] DEBUG kafka.consumer.SimpleConsumer - Disconnecting from kafka3:9092
2016-08-16 15:00:12.809 [Thread-9-my] DEBUG kafka.network.BlockingChannel - Created socket with SO_TIMEOUT = 10000 (requested 10000), SO_RCVBUF = 1048576 (requested 1048576), SO_SNDBUF = 65536 (requested -1), connectTimeoutMs = 10000.
2016-08-16 15:00:12.845 [Thread-9-my] DEBUG kafka.consumer.SimpleConsumer - Disconnecting from kafka3:9092
2016-08-16 15:00:12.847 [Thread-9-my] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.io.IOException: 远程主机强迫关闭了一个现有的连接。(In english means: Remote host closed an existing connection)
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.3.jar:0.9.3]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$fn__3371$fn__3386$fn__3415.invoke(executor.clj:565) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
omited some...
2016-08-16 15:00:12.849 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:exists cxid:0x22 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807/my
2016-08-16 15:00:12.849 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:exists cxid:0x22 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807/my
2016-08-16 15:00:12.849 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 34,3 replyHeader:: 34,33,-101 request:: '/storm/errors/topoName-1-1471330807/my,F response::
2016-08-16 15:00:12.850 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:exists cxid:0x23 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807
2016-08-16 15:00:12.850 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:exists cxid:0x23 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807
2016-08-16 15:00:12.850 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 35,3 replyHeader:: 35,33,-101 request:: '/storm/errors/topoName-1-1471330807,F response::
2016-08-16 15:00:12.850 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:exists cxid:0x24 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors
2016-08-16 15:00:12.851 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:exists cxid:0x24 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors
2016-08-16 15:00:12.851 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 36,3 replyHeader:: 36,33,0 request:: '/storm/errors,F response:: s{9,9,1471330807639,1471330807639,0,0,0,0,1,0,9}
2016-08-16 15:00:12.854 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:create cxid:0x25 zxid:0x22 txntype:1 reqpath:n/a
2016-08-16 15:00:12.854 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:create cxid:0x25 zxid:0x22 txntype:1 reqpath:n/a
2016-08-16 15:00:12.855 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 37,1 replyHeader:: 37,34,0 request:: '/storm/errors/topoName-1-1471330807,#7,v{s{31,s{'world,'anyone}}},0 response:: '/storm/errors/topoName-1-1471330807
2016-08-16 15:00:12.857 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:create cxid:0x26 zxid:0x23 txntype:1 reqpath:n/a
2016-08-16 15:00:12.857 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:create cxid:0x26 zxid:0x23 txntype:1 reqpath:n/a
2016-08-16 15:00:12.857 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 38,1 replyHeader:: 38,35,0 request:: '/storm/errors/topoName-1-1471330807/my,#7,v{s{31,s{'world,'anyone}}},0 response:: '/storm/errors/topoName-1-1471330807/my
2016-08-16 15:00:12.860 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:create cxid:0x27 zxid:0x24 txntype:1 reqpath:n/a
2016-08-16 15:00:12.860 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:create cxid:0x27 zxid:0x24 txntype:1 reqpath:n/a
2016-08-16 15:00:12.861 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 39,1 replyHeader:: 39,36,0 request:: '/storm/errors/topoName-1-1471330807/my/e,#ffffffacffffffed05737201f636c6f6a7572652e6c616e672e50657273697374656e7441727261794d6170ffffffd02836ffffff8f21ffffffe4ffffffa0f2024c055f6d6574617401d4c636c6f6a7572652f6c616e672f4950657273697374656e744d61703b5b056172726179740135b4c6a6176612f6c616e672f4f626a6563743b787201b636c6f6...omit some,v{s{31,s{'world,'anyone}}},2 response:: '/storm/errors/topoName-1-1471330807/my/e0000000000
2016-08-16 15:00:12.865 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x15692266a45000b type:getChildren2 cxid:0x28 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807/my
2016-08-16 15:00:12.865 [SyncThread:0] DEBUG o.a.storm.zookeeper.server.FinalRequestProcessor - sessionid:0x15692266a45000b type:getChildren2 cxid:0x28 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/topoName-1-1471330807/my
2016-08-16 15:00:12.866 [Thread-6-SendThread(0:0:0:0:0:0:0:1:2000)] DEBUG org.apache.storm.zookeeper.ClientCnxn - Reading reply sessionid:0x15692266a45000b, packet:: clientPath:null serverPath:null finished:false header:: 40,12 replyHeader:: 40,36,0 request:: '/storm/errors/topoName-1-1471330807/my,F response:: v{'e0000000000},s{35,35,1471330812855,1471330812855,0,1,0,0,1,1,36}
2016-08-16 15:00:12.867 [Thread-9-my] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.5.jar:0.9.5]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__4694$fn__4695.invoke(worker.clj:493) [storm-core-0.9.5.jar:0.9.5]
at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Process finished with exit code 1
我确信我已经正确地设置了主机,更奇怪的是,如果我把这个项目编译成jar并部署到linux服务器上,我就可以运行它。
我也怀疑这个主题是否有问题,因为当我改变一个新的主题时,所有的代码都可以正常运行。
有人能帮忙吗?谢谢!
1条答案
按热度按时间2guxujil1#
我会检查主题是否在kafka或zookeeper>./zkcli.sh>>ls/brokers/topics中创建。我有连接错误(类似于套接字连接错误),对于我的情况,主机是不正确的。通过cd/etc/hosts检查主机列表,确保ip地址和主机名(localhost)配置正确,并且没有其他主机名。您可以在zookeeper cli>>get/brokers/ids/(brokerid no)中验证并检查plaitext。应该匹配。
正如你所说的,对于其他的主题,它是有效的。这就意味着你的观点肯定是错误的。我什么也没看到。不过,我很想知道是不是有别的东西。