我使用lineardrpctopologybuilder来构建拓扑。
如果:
在生成器的第二个位置(第一个位置工作正常),我放了一个richbolt,然后
在其他线程中执行处理和确认,
最后一个元组的ack失败:
java.lang.IllegalStateException: Coordination condition met on a non-coordinating tuple. Should be impossible
at org.apache.storm.coordination.CoordinatedBolt.checkFinishId(CoordinatedBolt.java:131)
at org.apache.storm.coordination.CoordinatedBolt.access$200(CoordinatedBolt.java:44)
at org.apache.storm.coordination.CoordinatedBolt$CoordinatedOutputCollector.ack(CoordinatedBolt.java:333)
at org.apache.storm.task.OutputCollector.ack(OutputCollector.java:181)
at Reproduce$ExclaimBolt.lambda$execute$0(Reproduce.java:86)
问题:我创建的设置有什么问题?
完整的代码样本再现问题。在风暴2.1.0和0.9.5下测试。
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBatchBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.storm.utils.Utils.DEFAULT_STREAM_ID;
public class Reproduce {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
public static void main(String[] args) throws Exception {
LocalDRPC localDRPC = new LocalDRPC();
LinearDRPCTopologyBuilder topologyBuilder = new LinearDRPCTopologyBuilder("abc");
topologyBuilder.addBolt(new SplitSentence()).shuffleGrouping(DEFAULT_STREAM_ID);
topologyBuilder.addBolt(new ExclaimBolt()).shuffleGrouping(DEFAULT_STREAM_ID);
topologyBuilder.addBolt(new ResultBolt()).fieldsGrouping(DEFAULT_STREAM_ID, new Fields("id"));
StormTopology topology = topologyBuilder.createLocalTopology(localDRPC);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("test", new HashMap<>(), topology);
String result = localDRPC.execute("abc", "some sentence");
System.out.println("Result: " + result);
localCluster.shutdown();
localDRPC.shutdown();
}
public static class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
Runnable task = () -> {
Object id = tuple.getValue(0);
String sentence = tuple.getString(1);
for (String word : sentence.split(" ")) {
_collector.emit(tuple, new Values(id, word));
}
_collector.ack(tuple);
};
EXECUTOR_SERVICE.submit(task);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
public static class ExclaimBolt extends BaseRichBolt {
OutputCollector collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
Runnable task = () -> {
Object id = tuple.getValue(0);
String input = tuple.getString(1);
collector.emit(new Values(id, input + "!"));
try {
collector.ack(tuple);
} catch (Throwable t) {
t.printStackTrace();
throw t;
}
};
EXECUTOR_SERVICE.submit(task);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "word"));
}
}
public static class ResultBolt extends BaseBatchBolt<Object> {
private final List<String> words = new ArrayList<>();
private BatchOutputCollector collector;
private Object id;
@Override
public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, Object id) {
this.collector = collector;
this.id = id;
}
@Override
public void execute(Tuple tuple) {
words.add(tuple.getString(1));
}
@Override
public void finishBatch() {
collector.emit(new Values(id, words.toString()));
words.clear();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!