java.nio.channels.closedchannelexception在使用来自storm spout的消息时发生异常

zysjyyx4  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(276)

我已经编写了storm拓扑,它使用kafka喷口从kafka获取数据。它在本地环境中运行良好,但在集群中运行良好
我得到以下错误:
2018-05-16 18:25:59.358 o.a.s.k.zkcoordinator线程-25-kafkaspout-executor[20 20][info]任务[1/1]刷新分区管理器连接2018-05-16 18:25:59.359 o.a.s.k.dynamicbrokersreader线程-25-kafkaspout-executor[20 20][info]从zookeeper读取分区信息:globalpartitioninformation{topic=data ops,分区图={0=uat-datalake-node2。org:6667}}2018-05-16 18:25:59.359 o.a.s.k.kafkautils thread-25-kafkaspout-executor[20 20][info]任务[1/1]已分配[partition{host=uat-datalake-node2]。org:6667,主题=数据操作,partition=0}]2018-05-16 18:25:59.360 o.a.s.k.zkcoordinator thread-25-kafkaspout-executor[20 20][info]task[1/1]已删除分区管理器:[]2018-05-16 18:25:59.360 o.a.s.k.zkcoordinator thread-25-kafkaspout-executor[20 20][info]task[1/1]新分区管理器:[]2018-05-16 18:25:59.360 o.a.s.k.zkcoordinatorthread-25-kafkaspout-executor[20 20][info]任务[1/1]已完成刷新2018-05-16 18:25:59.361 k.c.simpleconsumer thread-25-kafkaspout-executor[20 20 20][info]由于错误重新连接:java.nio.channels.closedchannelexception at kafka.network.blockingchannel.send(blockingchannel)。scala:110)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer.liftedtree1$1(simpleconsumer。scala:85)[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer.kafka$consumer$simpleconsumer$$sendrequest(simpleconsumer。scala:83)[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$fetch$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(simpleconsumer)。scala:132) [kafka_2.10-0.10.2.1.jar:?]位于kafka.consumer.simpleconsumer$$anonfun$fetch$1$$anonfun$apply$mcv$sp$1.apply(simpleconsumer)。scala:132)[kafka_2.10-0.10.2.1.jar:?]位于kafka.consumer.simpleconsumer$$anonfun$fetch$1$$anonfun$apply$mcv$sp$1.apply(simpleconsumer)。scala:132)[kafka_2.10-0.10.2.1.jar:?]在kafka.metrics.kafkatimer.time(kafkatimer。scala:33) [kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$fetch$1.apply$mcv$sp(simpleconsumer。scala:131)[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$获取$1.apply(simpleconsumer。scala:131)[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$获取$1.apply(simpleconsumer。scala:131) [kafka_2.10-0.10.2.1.jar:?]在kafka.metrics.kafkatimer.time(kafkatimer。scala:33)[kafka_2.10-0.10.2.1.jar:?]位于kafka.consumer.simpleconsumer.fetch(simpleconsumer。scala:130)[kafka_2.10-0.10.2.1.jar:?]位于kafka.javaapi.consumer.simpleconsumer.fetch(simpleconsumer)。scala:47)[Kafka2.10-0.10.2.1.jar:?]在org.apache.storm.kafka.kafkautils.fetchmessages(kafkautils。java:191)[风暴-Kafka-1.0.1。jar:1.0.1]在org.apache.storm.kafka.partitionmanager.fill(partitionmanager。java:189)[风暴-Kafka-1.0.1。jar:1.0.1]在org.apache.storm.kafka.partitionmanager.next(partitionmanager。java:138)[风暴-Kafka-1.0.1。jar:1.0.1]在org.apache.storm.kafka.kafkaspout.nexttuple(kafkaspout。java:135)[风暴-Kafka-1.0.1。jar:1.0.1]在org.apache.storm.daemon.executor$fn\uu 6505$fn\uu 6520$fn\uu 6551.invoke(executor。clj:651)[风暴核心-1.0.1.2.5.3.0-37。jar:1.0.1.2.5.3.0-37]在org.apache.storm.util$async\u loop$fn\u 554.invoke(util。clj:484)[风暴核心-1.0.1.2.5.3.0-37。jar:1.0.1.2.5.3.0-37] 在clojure.lang.afn.run(afn。java:22)[clojure-1.7.0.jar:?]位于java.lang.thread.run(thread。java:748)[?:1.8.0_]2018-05-16 18:26:09.372 o.a.s.k.kafkautils thread-25-kafkaspout-executor[20 20][warn]获取消息时出现网络错误:java.net.sockettimeoutexception at sun.nio.ch.socketadaptor$socketinputstream.read(socketadaptor)。java:211) 在sun.nio.ch.channelinputstream.read(channelinputstream。java:103)在java.nio.channels.channels$readablebytechannelimpl.read(channels。java:385)~[?:1.8.0\u 144]位于org.apache.kafka.common.network.networkreceive.readfromreadablechannel(networkreceive)。java:81)~[kafka-clients-0.10.2.1.jar:?]在Kafka。网络。封锁频道。完全阅读(封锁频道。scala:129)~[kafka_2.10-0.10.2.1.jar:?]位于kafka.network.blockingchannel.receive(blockingchannel。scala:120)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer.liftedtree1$1(simpleconsumer。scala:99)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer.kafka$consumer$simpleconsumer$$sendrequest(simpleconsumer。scala:83)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$fetch$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(simpleconsumer)。scala:132)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$获取$1$$anonfun$应用$mcv$sp$1.apply(simpleconsumer。scala:132)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$fetch$1$$anonfun$apply$mcv$sp$1.apply(simpleconsumer)。scala:132)~[kafka_2.10-0.10.2.1.jar:?]在kafka.metrics.kafkatimer.time(kafkatimer。scala:33)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$获取$1。应用$mcv$sp(simpleconsumer。scala:131)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$获取$1.apply(simpleconsumer。scala:131)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$获取$1.apply(simpleconsumer。scala:131)~[kafka_2.10-0.10.2.1.jar:?]在Kafka。度量。Kafka蒂默。时间(Kafka蒂默。scala:33)~[kafka_2.10-0.10.2.1.jar:?]位于kafka.consumer.simpleconsumer.fetch(simpleconsumer。scala:130)~[kafka_2.10-0.10.2.1.jar:?]位于kafka.javaapi.consumer.simpleconsumer.fetch(simpleconsumer。scala:47)~[kafka_2.10-0.10.2.1.jar:?]在org.apache.storm.kafka.kafkautils.fetchmessages(kafkautils。java:191) [风暴-Kafka-1.0.1。jar:1.0.1]在org.apache.storm.kafka.partitionmanager.fill(partitionmanager。java:189)[风暴-Kafka-1.0.1。jar:1.0.1]在org.apache.storm.kafka.partitionmanager.next(partitionmanager。java:138)[风暴-Kafka-1.0.1。jar:1.0.1]在org.apache.storm.kafka.kafkaspout.nexttuple(kafkaspout。java:135)[风暴-Kafka-1.0.1。jar:1.0.1]在org.apache.storm.daemon.executor$fn\uu 6505$fn\uu 6520$fn\uu 6551.invoke(executor。clj:651)[风暴核心-1.0.1.2.5.3.0-37。jar:1.0.1.2.5.3.0-37]在org.apache.storm.util$async\u loop$fn\u 554.invoke(util。clj:484)[风暴核心-1.0.1.2.5.3.0-37。jar:1.0.1.2.5.3.0-37]在clojure.lang.afn.run(afn。java:22)[clojure-1.7.0.jar:?]在java.lang.thread.run(线程。java:748)[?:1.8.0_]2018-05-16 18:26:09.373 o.a.s.k.kafkaspout线程-25-kafkaspout-executor[20 20][警告]获取失败org.apache.storm.kafka.failedfetchexception:java.net.sockettimeoutexception at org.apache.storm.kafka.kafkautils.fetchmessages(kafkautils)。java:199)~[storm-kafka-1.0.1。jar:1.0.1]在org.apache.storm.kafka.partitionmanager.fill(partitionmanager。java:189)~[storm-kafka-1.0.1。jar:1.0.1]在org.apache.storm.kafka.partitionmanager.next(partitionmanager。java:138)~[storm-kafka-1.0.1。jar:1.0.1]在org.apache.storm.kafka.kafkaspout.nexttuple(kafkaspout。java:135)[风暴-Kafka-1.0.1。jar:1.0.1]在org.apache.storm.daemon.executor$fn\uu 6505$fn\uu 6520$fn\uu 6551.invoke(executor。clj:651)[风暴核心-1.0.1.2.5.3.0-37。jar:1.0.1.2.5.3.0-37]在org.apache.storm.util$async\u loop$fn\u 554.invoke(util。clj:484)[风暴核心-1.0.1.2.5.3.0-37。jar:1.0.1.2.5.3.0-37]在clojure.lang.afn.run(afn。java:22)[clojure-1.7.0.jar:?]在java.lang.thread.run(线程。java:748)[?:1.8.0_]原因:java.net.sockettimeoutexception at sun.nio.ch.socketadaptor$socketinputstream.read(socketadaptor。java:211)在sun.nio.ch.channelinputstream.read(channelinputstream。java:103)在java.nio.channels.channels$readablebytechannelimpl.read(channels。java:385) ~[?:1.8.0\u 144]位于org.apache.kafka.common.network.networkreceive.readfromreadablechannel(networkreceive)。java:81)~[kafka-clients-0.10.2.1.jar:?]位于kafka.network.blockingchannel.readcomplety(blockingchannel。scala:129)~[kafka_2.10-0.10.2.1.jar:?]位于kafka.network.blockingchannel.receive(blockingchannel。scala:120)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer.liftedtree1$1(simpleconsumer。scala:99)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer.kafka$consumer$simpleconsumer$$sendrequest(simpleconsumer。scala:83)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$fetch$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(simpleconsumer)。scala:132) ~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$fetch$1$$anonfun$apply$mcv$sp$1.apply(simpleconsumer)。scala:132)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$fetch$1$$anonfun$apply$mcv$sp$1.apply(simpleconsumer)。scala:132)~[kafka_2.10-0.10.2.1.jar:?]在Kafka。度量。Kafka蒂默。时间(Kafka蒂默。scala:33)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$获取$1。应用$mcv$sp(simpleconsumer)。scala:131)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$获取$1.apply(simpleconsumer。scala:131)~[kafka_2.10-0.10.2.1.jar:?]在kafka.consumer.simpleconsumer$$anonfun$获取$1.apply(simpleconsumer。scala:131)~[kafka_2.10-0.10.2.1.jar:?]在kafka.metrics.kafkatimer.time(kafkatimer。scala:33)~[kafka_2.10-0.10.2.1.jar:?]位于kafka.consumer.simpleconsumer.fetch(simpleconsumer。scala:130)~[kafka_2.10-0.10.2.1.jar:?]在kafka.javaapi.consumer.simpleconsumer.fetch(simpleconsumer。scala:47)~[kafka_2.10-0.10.2.1.jar:?]在org.apache.storm.kafka.kafkautils.fetchmessages(kafkautils。java:191)~[storm-kafka-1.0.1。jar:1.0.1] ... 7个以上

bkhjykvo

bkhjykvo1#

看起来你在暴风工作者试图从Kafka经纪人那里读取数据时超时了。也许他们之间的联系是脆弱的还是缓慢的?
也就是说,堆栈跟踪似乎表明消费者已经重新连接,因此如果这种情况很少发生,那么您可能只是在worker和kafka之间的连接中遇到了一个小问题。
如果这种情况经常发生,并且你确定连接是稳定的,我会尝试在Kafka的邮件列表上询问https://kafka.apache.org/contact. 如果您发布您的问题以及您使用的是哪个kafka版本,他们可能会告诉您是否存在可能导致使用者套接字超时的问题。

相关问题