backtype.storm.Config.setNumWorkers()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(8.7k)|赞(0)|评价(0)|浏览(123)

本文整理了Java中backtype.storm.Config.setNumWorkers()方法的一些代码示例,展示了Config.setNumWorkers()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Config.setNumWorkers()方法的具体详情如下:
包路径:backtype.storm.Config
类名称:Config
方法名:setNumWorkers

Config.setNumWorkers介绍

暂无

代码示例

代码示例来源:origin: alibaba/jstorm

public void setNumWorkers(int workers) {
  setNumWorkers(this, workers);
}

代码示例来源:origin: alibaba/jstorm

conf.setNumWorkers(hosts.size() + 3);
} else {
  conf.setNumWorkers(hosts.size());

代码示例来源:origin: alibaba/jstorm

public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf,
                    int runtimeInSeconds, Callback callback) throws Exception {
  if (conf.get(Config.TOPOLOGY_WORKERS) == null) {
    conf.setNumWorkers(3);
  }
  StormSubmitter.submitTopology(topologyName, conf, topology);
  if (JStormUtils.parseBoolean(conf.get("RUN_LONG_TIME"), false)) {
    LOG.info(topologyName + " will run long time");
    return;
  }
  if (runtimeInSeconds < 120) {
    JStormUtils.sleepMs(120 * 1000);
  } else {
    JStormUtils.sleepMs(runtimeInSeconds * 1000);
  }
  if (callback != null) {
    callback.execute(topologyName);
  }
  killTopology(conf, topologyName);
}

代码示例来源:origin: alibaba/jstorm

prepare();
conf.setNumWorkers(workerNumber);

代码示例来源:origin: alibaba/jstorm

conf.setNumWorkers(8);

代码示例来源:origin: alibaba/jstorm

conf.setNumWorkers(6);
if (args.length != 0) {

代码示例来源:origin: com.n3twork.storm/storm-core

public void setNumWorkers(int workers) {
  setNumWorkers(this, workers);
}

代码示例来源:origin: com.twitter.heron/heron-storm

public void setNumWorkers(int workers) {
 setNumWorkers(this, workers);
}

代码示例来源:origin: com.alibaba.jstorm/jstorm-core

public void setNumWorkers(int workers) {
  setNumWorkers(this, workers);
}

代码示例来源:origin: eshioji/trident-tutorial

public static void main(String[] args) throws Exception {
  Config conf = new Config();
  conf.setNumWorkers(6);
  if (args.length == 2) {
    // Ready & submit the topology
    String name = args[0];
    BrokerHosts hosts = new ZkHosts(args[1]);
    TransactionalTridentKafkaSpout kafkaSpout = TestUtils.testTweetSpout(hosts);
    StormSubmitter.submitTopology(name, conf, buildTopology(kafkaSpout));
  }else{
    System.err.println("<topologyName> <zookeeperHost>");
  }
}

代码示例来源:origin: menacher/nerdronix

public static void main(String[] args) throws InterruptedException {
  Config config = new Config();
  config.setDebug(true);
  config.setNumWorkers(2);
  LocalCluster cluster = new LocalCluster();
  cluster.submitTopology("test", config, createTopology());
  LATCH.await();
  System.out.println("Processing complete");
  cluster.killTopology("test");
  cluster.shutdown();
}

代码示例来源:origin: openimaj/openimaj

@Override
public Config prepareConfig() {
  if (preparedConfig == null) {
    preparedConfig = new Config();
    preparedConfig.setMaxSpoutPending(500);
    preparedConfig.setNumWorkers(numberOfWorkers);
    preparedConfig.setFallBackOnJavaSerialization(false);
    preparedConfig.setSkipMissingKryoRegistrations(false);
    JenaStormUtils.registerSerializers(preparedConfig);
  }
  return preparedConfig;
}

代码示例来源:origin: org.jwall/streams-storm

public static void main(String[] args) throws Exception {

    if (args.length != 1) {
      System.err.println("Missing XML definition (base64 encoded)!");
      return;
    }

    Document doc = DocumentEncoder.decodeDocument(args[0]);
    Config conf = new Config();
    conf.setNumWorkers(20);

    StreamTopology streamTop = build(doc, new TopologyBuilder());
    StormTopology topology = streamTop.getTopologyBuilder()
        .createTopology();

    StormSubmitter.submitTopology("test", conf, topology);
  }
}

代码示例来源:origin: pranab/chombo

/**
 * @param topologyName
 * @param conf
 * @param builder
 * @throws AlreadyAliveException
 * @throws InvalidTopologyException
 */
public static void submitStormTopology(String topologyName, Config conf,  TopologyBuilder builder) 
    throws AlreadyAliveException, InvalidTopologyException {
  int numWorkers = ConfigUtility.getInt(conf, "num.workers", 1);
  int maxSpoutPending = ConfigUtility.getInt(conf, "max.spout.pending", 1000);
  int maxTaskParalleism = ConfigUtility.getInt(conf, "max.task.parallelism", 100);
  conf.setNumWorkers(numWorkers);
  conf.setMaxSpoutPending(maxSpoutPending);
  conf.setMaxTaskParallelism(maxTaskParalleism);
  StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
}

代码示例来源:origin: jasonTangxd/recommendSys

public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("spout", new DemoSpout(), 1);
  //builder.setBolt("bolt", new RecommenderBolt(), 1).shuffleGrouping("spout");
  builder.setBolt("detailBolt", new RecommenderDetailBolt(), 1).shuffleGrouping("spout");
  builder.setBolt("statBolt", new RecommenderStatBolt(), 2).fieldsGrouping("detailBolt", new Fields("userId"));
  Config config = new Config();
  //这一行代码表示让spout每10秒发送一个特殊的tuple 此tuple的 SourceComponent就是 SYSTEM_COMPONENT_ID = "__system"
  //config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
  
  config.setDebug(true);
  //提交远程
  if (args != null && args.length > 0) {
    config.setNumWorkers(3); // use three worker processes
   StormSubmitter.submitTopology(args[0], config, builder.createTopology());
  }
  else {
    //提交本地模式
    LocalCluster cluster = new LocalCluster();
    config.setNumWorkers(3);
    cluster.submitTopology("recommenderTopology", config, builder.createTopology());
  }
}

代码示例来源:origin: apache/eagle

@Ignore
@Test
public void testRoutingByStreamId() throws Exception {
  Config conf = new Config();
  conf.setNumWorkers(2); // use two worker processes
  TopologyBuilder topologyBuilder = new TopologyBuilder();
  topologyBuilder.setSpout("blue-spout", new BlueSpout()); // parallelism hint
  topologyBuilder.setBolt("green-bolt-1", new GreenBolt(1))
    .shuffleGrouping("blue-spout", "green-bolt-stream-1");
  topologyBuilder.setBolt("green-bolt-2", new GreenBolt(2))
    .shuffleGrouping("blue-spout", "green-bolt-stream-2");
  LocalCluster cluster = new LocalCluster();
  cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());
  while (true) {
    try {
      Thread.sleep(1000);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

代码示例来源:origin: eshioji/trident-tutorial

public static void main(String[] args) throws Exception {
  Config conf = new Config();
  // Submits the topology
  String topologyName = args[0];
  conf.setNumWorkers(8); // Our Vagrant environment has 8 workers
  FakeTweetsBatchSpout fakeTweets = new FakeTweetsBatchSpout(10);
  TridentTopology topology = new TridentTopology();
  TridentState countState =
      topology
          .newStream("spout", fakeTweets)
          .groupBy(new Fields("actor"))
          .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
  topology
      .newDRPCStream("count_per_actor")
      .stateQuery(countState, new Fields("args"), new MapGet(), new Fields("count"));
  StormSubmitter.submitTopology(topologyName, conf, topology.build());
}

代码示例来源:origin: openimaj/openimaj

public static void main(String[] args) {
    final Config conf = new Config();
    conf.setDebug(false);
    conf.setNumWorkers(2);
    conf.setMaxSpoutPending(1);
    conf.setFallBackOnJavaSerialization(false);
    conf.setSkipMissingKryoRegistrations(false);
    final LocalCluster cluster = new LocalCluster();
    final TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("randomSpout1", new RandomFieldSpout(2, 0, 0, 1)); // (nfields,seed,min,max)
    builder.setSpout("randomSpout2", new RandomFieldSpout(2, 10, 0, 1)); // (nfields,seed,min,max)
    JoinBolt.connectNewBolt(builder);
    final StormTopology topology = builder.createTopology();
    cluster.submitTopology("playTopology", conf, topology);
    Utils.sleep(10000);
    cluster.killTopology("playTopology");
    cluster.shutdown();

  }
}

代码示例来源:origin: org.openimaj.storm/core-storm

public static void main(String[] args) {
    final Config conf = new Config();
    conf.setDebug(false);
    conf.setNumWorkers(2);
    conf.setMaxSpoutPending(1);
    conf.setFallBackOnJavaSerialization(false);
    conf.setSkipMissingKryoRegistrations(false);
    final LocalCluster cluster = new LocalCluster();
    final TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("randomSpout1", new RandomFieldSpout(2, 0, 0, 1)); // (nfields,seed,min,max)
    builder.setSpout("randomSpout2", new RandomFieldSpout(2, 10, 0, 1)); // (nfields,seed,min,max)
    JoinBolt.connectNewBolt(builder);
    final StormTopology topology = builder.createTopology();
    cluster.submitTopology("playTopology", conf, topology);
    Utils.sleep(10000);
    cluster.killTopology("playTopology");
    cluster.shutdown();

  }
}

代码示例来源:origin: calrissian/flowmix

public void run() {
 StormTopology topology = new FlowmixBuilder()
   .setFlowLoader(new SimpleFlowLoaderSpout(provider.getFlows(), 60000))
   .setEventsLoader(new MockEventGeneratorSpout(getMockEvents(), 10))
   .setOutputBolt(new PrinterBolt())
   .setParallelismHint(6)
  .create()
 .createTopology();
 Config conf = new Config();
 conf.setNumWorkers(20);
 conf.setMaxSpoutPending(5000);
 conf.setDebug(false);
 conf.registerSerialization(BaseEvent.class, EventSerializer.class);
 conf.setSkipMissingKryoRegistrations(false);
 LocalCluster cluster = new LocalCluster();
 cluster.submitTopology("example-topology", conf, topology);
}

相关文章