apache storm喷口的发射速度比Bolt计算快,导致concurrentmodificationexception

bn31dyow  于 2021-06-21  发布在  Storm
关注(0)|答案(0)|浏览(220)

这是我的情况。我有一个Kafka喷口,喷口每秒发出一个元组。螺栓将连接到rserve,需要大约5秒钟来计算结果,然后发出一个元组。但是螺栓总是得到一致的修正异常然后工人就死了!

public class LagBolt extends BaseRichBolt {

OutputCollector _collector;

public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    _collector = outputCollector;
}

public void execute(Tuple tuple) {
    List<Pair> pairs = (List<Pair>) tuple.getValueByField("pairs");
    double[] value = new double[pairs.size()];
    for (int i = 0; i < pairs.size(); i++) {
        value[i] = pairs.get(i).getValue();
    }

    Model model = new Model();
    RConnection rConnection = new RserveConnection().getrConnection();

    // the test() and estimate() need about five seconds to compute
    int d = model.test(rConnection, value);
    int[] ap = model.estimate(rConnection, value, d);
    List<Parameter> parameters = delegate(ap, d);
    rConnection.close();

    _collector.emit(tuple, new Values(parameters));
    _collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("parameters"));
}

private List<Parameter> delegate(int[] ap, int d){
    int apNum = ap[0];
    List<Parameter> parameters = new ArrayList<Parameter>();

    for(int i = 1; i < 1+apNum; i++){
        for(int j = apNum+1; j < ap.length; j++){
            Parameter parameter = new Parameter(ap[i], d, ap[j]);
            parameters.add(parameter);
        }
    }

    return parameters;
}

这是个例外
379632[thread-14-lag]错误b.s.util-异步循环已终止!java.lang.runtimeexception:backtype.storm.utils.disruptorqueue.consumebatchtocursor(disruptorqueue)处的java.util.concurrentmodificationexception。java:135)~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.utils.disruptorqueue.consumebatchwhenavailable(disruptorqueue。java:106)~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.disruptor$在可用时使用批处理。clj:80)~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.daemon.executor$fn\uu5694$fn\u5707$fn\u5758.invoke(executor。clj:819)~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.util$async\u loop$fn\u 545.invoke(util。clj:479)[风暴核心-0.10.0。jar:0.10.0]在clojure.lang.afn.run(afn。java:22)[clojure-1.6.0.jar:?]位于java.lang.thread.run(thread。java:745)[?:1.7.091]原因:java.util.arraylist$sublist.CheckForComModification(arraylist)处的java.util.concurrentmodificationexception。java:1169)~[?:1.7.0\u 91]位于java.util.arraylist$sublist.listiterator(arraylist。java:1049)~[?:1.7.0μ91]在java.util.abstractlist.listiterator(抽象列表。java:299)~[?:1.7.0\u 91]位于java.util.arraylist$sublist.iterator(arraylist。java:1045)~[?:1.7.0\u91]位于java.util.abstractcollection.tostring(abstractcollection.com)。java:450)在java.lang.string.valueof(string。java:2849)~[?:1.7.0\u 91]位于java.lang.stringbuilder.append(stringbuilder。java:128) ~[?:1.7.0\u91]位于java.util.abstractcollection.tostring(abstractcollection.com)。java:458)在backtype.storm.tuple.tupleimpl.tostring(tupleimpl。java:222)~[风暴核心-0.10.0。jar:0.10.0]在clojure.core$str.invoke(core。clj:520)~[clojure-1.6.0.jar:?]在clojure.core$str$fn\uu3928.invoke(core。clj:524)~[clojure-1.6.0.jar:?]在clojure.core$str.doinvoke(核心。clj:526)~[clojure-1.6.0.jar:?]位于clojure.lang.restfn.invoke(restfn。java:516)~[clojure-1.6.0.jar:?]位于backtype.storm.daemon.executor$fn\uu5694$tuple\u action\u fn\u5696.invoke(executor。clj:693)~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.daemon.executor$mk\u task\u receiver$fn\uu 5615.invoke(executor。clj:436) ~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.disruptor$clojure\u handler$reify\u 5189.onevent(disruptor。clj:58)~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.utils.disruptorqueue.consumebatchtocursor(disruptorqueue。java:132)~[风暴核心-0.10.0。jar:0.10.0] ... 6更多379677[thread-14-lag]错误b.s.d.executor-java.lang.runtimeexception:java.util.concurrentmodificationexception at backtype.storm.utils.disruptorqueue.consumebatchtocursor(disruptorqueue)。java:135)~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.utils.disruptorqueue.consumebatchwhenavailable(disruptorqueue。java:106) ~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.disruptor$consume\u batch\u when\u available.invoke(disruptor。clj:80)~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.daemon.executor$fn\uu5694$fn\u5707$fn\u5758.invoke(executor。clj:819)~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.util$async\u loop$fn\u 545.invoke(util。clj:479) [风暴核心-0.10.0。jar:0.10.0]在clojure.lang.afn.run(afn。java:22)[clojure-1.6.0.jar:?]位于java.lang.thread.run(thread。java:745)[?:1.7.091]原因:java.util.arraylist$sublist.CheckForComModification(arraylist)处的java.util.concurrentmodificationexception。java:1169)~[?:1.7.0μ91]在java.util.arraylist$sublist.listiterator(arraylist。java:1049)~[?:1.7.0\u 91]位于java.util.abstractlist.listiterator(abstractlist。java:299)~[?:1.7.0\u 91]位于java.util.arraylist$sublist.iterator(arraylist。java:1045)~[?:1.7.0\u91]位于java.util.abstractcollection.tostring(abstractcollection.com)。java:450)~[?:1.7.0μ91]在java.lang.string.valueof(字符串。java:2849)~[?:1.7.0\u 91]位于java.lang.stringbuilder.append(stringbuilder。java:128)~[?:1.7.0\u91]位于java.util.abstractcollection.tostring(abstractcollection.com)。java:458)在backtype.storm.tuple.tupleimpl.tostring(tupleimpl。java:222)~[风暴核心-0.10.0。jar:0.10.0]在clojure.core$str.invoke(core。clj:520) ~[clojure-1.6.0.jar:?]在clojure.core$str$fn\uu3928.invoke(core。clj:524)~[clojure-1.6.0.jar:?]在clojure.core$str.doinvoke(core。clj:526)~[clojure-1.6.0.jar:?]位于clojure.lang.restfn.invoke(restfn。java:516)~[clojure-1.6.0.jar:?]位于backtype.storm.daemon.executor$fn\uu5694$tuple\u action\u fn\u5696.invoke(executor。clj:693) ~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.daemon.executor$mk\u task\u receiver$fn\uu 5615.invoke(executor。clj:436)~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.disruptor$clojure\u handler$reify\u 5189.onevent(disruptor。clj:58)~[风暴核心-0.10.0。jar:0.10.0]在backtype.storm.utils.disruptorqueue.consumebatchtocursor(disruptorqueue。java:132) ~[风暴核心-0.10.0。jar:0.10.0] ... 6个以上

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题