我是风暴的新手。我想使用一个名为“tileclean”的螺栓来发射单个 Stream
,和其他五个螺栓,以接收 Stream
同时。像这样:流图像
如您所见,“一,二,三,四,五”螺栓将同时收到相同的数据。但实际上,“一、二、三、四、五”螺栓不能接收任何数据。这是我的密码:
@Override
public void execute(TupleWindow inputWindow) {
logger.debug("clean");
List<Tuple> tuples = inputWindow.get();
//logger.debug("clean phrase. tuple size is : {}", tuples.size());
for (Tuple input : tuples) {
// some other code..
//this._collector.emit(input, new Values(nal));
this._collector.emit("stream_id_one", input, new Values(nal));
this._collector.emit("stream_id_two", input, new Values(nal));
this._collector.emit("stream_id_three", input, new Values(nal));
this._collector.emit("stream_id_four", input, new Values(nal));
this._collector.emit("stream_id_five", input, new Values(nal));
this._collector.ack(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(BoltConstant.EMIT_LOGOBJ));
declarer.declareStream("stream_id_one", new Fields(BoltConstant.EMIT_LOGOBJ));
declarer.declareStream("stream_id_two", new Fields(BoltConstant.EMIT_LOGOBJ));
declarer.declareStream("stream_id_three", new Fields(BoltConstant.EMIT_LOGOBJ));
declarer.declareStream("stream_id_four", new Fields(BoltConstant.EMIT_LOGOBJ));
declarer.declareStream("stream_id_five", new Fields(BoltConstant.EMIT_LOGOBJ));
}
拓扑集为:
builder.setBolt("tileClean", cleanBolt, 1).shuffleGrouping("assembly");
builder.setBolt("OneBolt", OneBolt, 1).shuffleGrouping("tileClean", "stream_id_one");
builder.setBolt("TwoBolt", TwoBolt, 1).shuffleGrouping("tileClean", "stream_id_two");
builder.setBolt("ThreeBolt", ThreeBolt, 1).shuffleGrouping("tileClean", "stream_id_three");
builder.setBolt("FourBolt", FourBolt, 1).shuffleGrouping("tileClean", "stream_id_four");
builder.setBolt("FiveBolt", FiveBolt, 1).shuffleGrouping("tileClean", "stream_id_five");
``` `tileClean` 可以接收从 `assymble` ,但其他螺栓不能接受。
我的代码有什么错误吗?
1条答案
按热度按时间unguejic1#
由于省略了“for loop”语句和第一个collector.emit语句之间的代码,因此消息无法通过的可能性之一是在省略的代码之间进行正确的错误处理。您可以确保在“collector.emit”语句之前记录try catch block或debug,以检查代码是否确实到达了那里。
以上内容也可以在storm ui上进行检查,在storm ui中,它将显示在喷口/螺栓之间传输消息的拓扑度量。它还报告任务执行之间可能发生的任何错误消息。
另一种可能性是,如果您使用的是多节点群集,如果您的任务分布在该节点上(即,如果您在拓扑配置中分配了一个以上的worker),请确保这些计算机可以在storm.yaml文件中配置的指定端口上通过网络相互通信。