动态创建拓扑

vfwfrxfs  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(273)

有没有办法在trident中动态创建拓扑?有人能举个例子吗?

kpbwa7wx

kpbwa7wx1#

首先,您可能还知道创建拓扑不是trident的一部分。三叉戟只是一个微博客的api。
从定义上讲,创建新拓扑是动态的。这就是 TopologyBuilder 课堂正在做。
所以要回答你的问题,是的,有可能创造新的拓扑三叉戟,或从简单的风暴喷口和螺栓。您唯一需要的是,您的拓扑创建逻辑应该能够访问storm集群(类和其他资源),如果您在storm中运行您的逻辑,那么从定义上讲,这也是可以满足的。
您最不需要的就是找到一种提交新创建的拓扑的方法,这就是 StormSubmitter 课程是为,这又是(!惊讶:))当您在三叉戟或普通的喷口/螺栓中运行逻辑时,满足于在类路径上的定义。
出于好奇,你为什么要这么做?你的要求是什么?
例子:

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;

public class DynamicTopologySpout implements IBatchSpout {

    private static final long serialVersionUID = -3269435263455830842L;

    @Override
    @SuppressWarnings("rawtypes")
    public void open(Map conf, TopologyContext context) {}

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        if (newTopologyNeeded()) {
            TopologyBuilder builder = new TopologyBuilder();
            builder
            .setSpout("spout", new BaseRichSpout() {
                private static final long serialVersionUID = 1L;
                @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}
                @Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
                @Override public void nextTuple() {}
            }, 1)
            .setMaxSpoutPending(15)
            .setNumTasks(1);
            StormTopology topology = builder.createTopology();
            Config config = new Config();
            try {
                StormSubmitter.submitTopology("dynamic-topology", config, topology);
            } catch (Exception e) {
                e.printStackTrace();
                collector.reportError(e);
            }
        }
    }

    private boolean newTopologyNeeded() {
        // Check if topology needed ...
        return false;
    }

    @Override
    public void ack(long batchId) {}

    @Override
    public void close() {}

    @Override
    public Map<String, Object> getComponentConfiguration() { return null; }

    @Override
    public Fields getOutputFields() { return null; }

}

相关问题