有没有办法在trident中动态创建拓扑?有人能举个例子吗?
kpbwa7wx1#
首先,您可能还知道创建拓扑不是trident的一部分。三叉戟只是一个微博客的api。从定义上讲,创建新拓扑是动态的。这就是 TopologyBuilder 课堂正在做。所以要回答你的问题,是的,有可能创造新的拓扑三叉戟,或从简单的风暴喷口和螺栓。您唯一需要的是,您的拓扑创建逻辑应该能够访问storm集群(类和其他资源),如果您在storm中运行您的逻辑,那么从定义上讲,这也是可以满足的。您最不需要的就是找到一种提交新创建的拓扑的方法,这就是 StormSubmitter 课程是为,这又是(!惊讶:))当您在三叉戟或普通的喷口/螺栓中运行逻辑时,满足于在类路径上的定义。出于好奇,你为什么要这么做?你的要求是什么?例子:
TopologyBuilder
StormSubmitter
import java.util.Map; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.spout.IBatchSpout; import org.apache.storm.tuple.Fields; public class DynamicTopologySpout implements IBatchSpout { private static final long serialVersionUID = -3269435263455830842L; @Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context) {} @Override public void emitBatch(long batchId, TridentCollector collector) { if (newTopologyNeeded()) { TopologyBuilder builder = new TopologyBuilder(); builder .setSpout("spout", new BaseRichSpout() { private static final long serialVersionUID = 1L; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {} @Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {} @Override public void nextTuple() {} }, 1) .setMaxSpoutPending(15) .setNumTasks(1); StormTopology topology = builder.createTopology(); Config config = new Config(); try { StormSubmitter.submitTopology("dynamic-topology", config, topology); } catch (Exception e) { e.printStackTrace(); collector.reportError(e); } } } private boolean newTopologyNeeded() { // Check if topology needed ... return false; } @Override public void ack(long batchId) {} @Override public void close() {} @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public Fields getOutputFields() { return null; } }
1条答案
按热度按时间kpbwa7wx1#
首先,您可能还知道创建拓扑不是trident的一部分。三叉戟只是一个微博客的api。
从定义上讲,创建新拓扑是动态的。这就是
TopologyBuilder
课堂正在做。所以要回答你的问题,是的,有可能创造新的拓扑三叉戟,或从简单的风暴喷口和螺栓。您唯一需要的是,您的拓扑创建逻辑应该能够访问storm集群(类和其他资源),如果您在storm中运行您的逻辑,那么从定义上讲,这也是可以满足的。
您最不需要的就是找到一种提交新创建的拓扑的方法,这就是
StormSubmitter
课程是为,这又是(!惊讶:))当您在三叉戟或普通的喷口/螺栓中运行逻辑时,满足于在类路径上的定义。出于好奇,你为什么要这么做?你的要求是什么?
例子: