ack失败,出现illegalstateexception

bogh5gae  于 2021-07-15  发布在  Storm
关注(0)|答案(0)|浏览(330)

我使用lineardrpctopologybuilder来构建拓扑。
如果:
在生成器的第二个位置(第一个位置工作正常),我放了一个richbolt,然后
在其他线程中执行处理和确认,
最后一个元组的ack失败:

java.lang.IllegalStateException: Coordination condition met on a non-coordinating tuple. Should be impossible
    at org.apache.storm.coordination.CoordinatedBolt.checkFinishId(CoordinatedBolt.java:131)
    at org.apache.storm.coordination.CoordinatedBolt.access$200(CoordinatedBolt.java:44)
    at org.apache.storm.coordination.CoordinatedBolt$CoordinatedOutputCollector.ack(CoordinatedBolt.java:333)
    at org.apache.storm.task.OutputCollector.ack(OutputCollector.java:181)
    at Reproduce$ExclaimBolt.lambda$execute$0(Reproduce.java:86)

问题:我创建的设置有什么问题?
完整的代码样本再现问题。在风暴2.1.0和0.9.5下测试。

import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBatchBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.storm.utils.Utils.DEFAULT_STREAM_ID;

public class Reproduce {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();

    public static void main(String[] args) throws Exception {
        LocalDRPC localDRPC = new LocalDRPC();

        LinearDRPCTopologyBuilder topologyBuilder = new LinearDRPCTopologyBuilder("abc");
        topologyBuilder.addBolt(new SplitSentence()).shuffleGrouping(DEFAULT_STREAM_ID);
        topologyBuilder.addBolt(new ExclaimBolt()).shuffleGrouping(DEFAULT_STREAM_ID);
        topologyBuilder.addBolt(new ResultBolt()).fieldsGrouping(DEFAULT_STREAM_ID, new Fields("id"));
        StormTopology topology = topologyBuilder.createLocalTopology(localDRPC);

        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("test", new HashMap<>(), topology);

        String result = localDRPC.execute("abc", "some sentence");
        System.out.println("Result: " + result);

        localCluster.shutdown();
        localDRPC.shutdown();
    }

    public static class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            Runnable task = () -> {
                Object id = tuple.getValue(0);
                String sentence = tuple.getString(1);
                for (String word : sentence.split(" ")) {
                    _collector.emit(tuple, new Values(id, word));
                }
                _collector.ack(tuple);
            };

            EXECUTOR_SERVICE.submit(task);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }
    }

    public static class ExclaimBolt extends BaseRichBolt {
        OutputCollector collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        public void execute(Tuple tuple) {
            Runnable task = () -> {
                Object id = tuple.getValue(0);
                String input = tuple.getString(1);
                collector.emit(new Values(id, input + "!"));
                try {
                    collector.ack(tuple);
                } catch (Throwable t) {
                    t.printStackTrace();
                    throw t;
                }
            };

            EXECUTOR_SERVICE.submit(task);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "word"));
        }
    }

    public static class ResultBolt extends BaseBatchBolt<Object> {

        private final List<String> words = new ArrayList<>();

        private BatchOutputCollector collector;
        private Object id;

        @Override
        public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, Object id) {
            this.collector = collector;
            this.id = id;
        }

        @Override
        public void execute(Tuple tuple) {
            words.add(tuple.getString(1));
        }

        @Override
        public void finishBatch() {
            collector.emit(new Values(id, words.toString()));
            words.clear();
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题