本文整理了Java中backtype.storm.Config.setNumWorkers()
方法的一些代码示例,展示了Config.setNumWorkers()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Config.setNumWorkers()
方法的具体详情如下:
包路径:backtype.storm.Config
类名称:Config
方法名:setNumWorkers
暂无
代码示例来源:origin: alibaba/jstorm
public void setNumWorkers(int workers) {
setNumWorkers(this, workers);
}
代码示例来源:origin: alibaba/jstorm
conf.setNumWorkers(hosts.size() + 3);
} else {
conf.setNumWorkers(hosts.size());
代码示例来源:origin: alibaba/jstorm
public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf,
int runtimeInSeconds, Callback callback) throws Exception {
if (conf.get(Config.TOPOLOGY_WORKERS) == null) {
conf.setNumWorkers(3);
}
StormSubmitter.submitTopology(topologyName, conf, topology);
if (JStormUtils.parseBoolean(conf.get("RUN_LONG_TIME"), false)) {
LOG.info(topologyName + " will run long time");
return;
}
if (runtimeInSeconds < 120) {
JStormUtils.sleepMs(120 * 1000);
} else {
JStormUtils.sleepMs(runtimeInSeconds * 1000);
}
if (callback != null) {
callback.execute(topologyName);
}
killTopology(conf, topologyName);
}
代码示例来源:origin: alibaba/jstorm
prepare();
conf.setNumWorkers(workerNumber);
代码示例来源:origin: alibaba/jstorm
conf.setNumWorkers(8);
代码示例来源:origin: alibaba/jstorm
conf.setNumWorkers(6);
if (args.length != 0) {
代码示例来源:origin: com.n3twork.storm/storm-core
public void setNumWorkers(int workers) {
setNumWorkers(this, workers);
}
代码示例来源:origin: com.twitter.heron/heron-storm
public void setNumWorkers(int workers) {
setNumWorkers(this, workers);
}
代码示例来源:origin: com.alibaba.jstorm/jstorm-core
public void setNumWorkers(int workers) {
setNumWorkers(this, workers);
}
代码示例来源:origin: eshioji/trident-tutorial
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setNumWorkers(6);
if (args.length == 2) {
// Ready & submit the topology
String name = args[0];
BrokerHosts hosts = new ZkHosts(args[1]);
TransactionalTridentKafkaSpout kafkaSpout = TestUtils.testTweetSpout(hosts);
StormSubmitter.submitTopology(name, conf, buildTopology(kafkaSpout));
}else{
System.err.println("<topologyName> <zookeeperHost>");
}
}
代码示例来源:origin: menacher/nerdronix
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.setDebug(true);
config.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, createTopology());
LATCH.await();
System.out.println("Processing complete");
cluster.killTopology("test");
cluster.shutdown();
}
代码示例来源:origin: openimaj/openimaj
@Override
public Config prepareConfig() {
if (preparedConfig == null) {
preparedConfig = new Config();
preparedConfig.setMaxSpoutPending(500);
preparedConfig.setNumWorkers(numberOfWorkers);
preparedConfig.setFallBackOnJavaSerialization(false);
preparedConfig.setSkipMissingKryoRegistrations(false);
JenaStormUtils.registerSerializers(preparedConfig);
}
return preparedConfig;
}
代码示例来源:origin: org.jwall/streams-storm
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Missing XML definition (base64 encoded)!");
return;
}
Document doc = DocumentEncoder.decodeDocument(args[0]);
Config conf = new Config();
conf.setNumWorkers(20);
StreamTopology streamTop = build(doc, new TopologyBuilder());
StormTopology topology = streamTop.getTopologyBuilder()
.createTopology();
StormSubmitter.submitTopology("test", conf, topology);
}
}
代码示例来源:origin: pranab/chombo
/**
* @param topologyName
* @param conf
* @param builder
* @throws AlreadyAliveException
* @throws InvalidTopologyException
*/
public static void submitStormTopology(String topologyName, Config conf, TopologyBuilder builder)
throws AlreadyAliveException, InvalidTopologyException {
int numWorkers = ConfigUtility.getInt(conf, "num.workers", 1);
int maxSpoutPending = ConfigUtility.getInt(conf, "max.spout.pending", 1000);
int maxTaskParalleism = ConfigUtility.getInt(conf, "max.task.parallelism", 100);
conf.setNumWorkers(numWorkers);
conf.setMaxSpoutPending(maxSpoutPending);
conf.setMaxTaskParallelism(maxTaskParalleism);
StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
}
代码示例来源:origin: jasonTangxd/recommendSys
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new DemoSpout(), 1);
//builder.setBolt("bolt", new RecommenderBolt(), 1).shuffleGrouping("spout");
builder.setBolt("detailBolt", new RecommenderDetailBolt(), 1).shuffleGrouping("spout");
builder.setBolt("statBolt", new RecommenderStatBolt(), 2).fieldsGrouping("detailBolt", new Fields("userId"));
Config config = new Config();
//这一行代码表示让spout每10秒发送一个特殊的tuple 此tuple的 SourceComponent就是 SYSTEM_COMPONENT_ID = "__system"
//config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
config.setDebug(true);
//提交远程
if (args != null && args.length > 0) {
config.setNumWorkers(3); // use three worker processes
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
}
else {
//提交本地模式
LocalCluster cluster = new LocalCluster();
config.setNumWorkers(3);
cluster.submitTopology("recommenderTopology", config, builder.createTopology());
}
}
代码示例来源:origin: apache/eagle
@Ignore
@Test
public void testRoutingByStreamId() throws Exception {
Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("blue-spout", new BlueSpout()); // parallelism hint
topologyBuilder.setBolt("green-bolt-1", new GreenBolt(1))
.shuffleGrouping("blue-spout", "green-bolt-stream-1");
topologyBuilder.setBolt("green-bolt-2", new GreenBolt(2))
.shuffleGrouping("blue-spout", "green-bolt-stream-2");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());
while (true) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码示例来源:origin: eshioji/trident-tutorial
public static void main(String[] args) throws Exception {
Config conf = new Config();
// Submits the topology
String topologyName = args[0];
conf.setNumWorkers(8); // Our Vagrant environment has 8 workers
FakeTweetsBatchSpout fakeTweets = new FakeTweetsBatchSpout(10);
TridentTopology topology = new TridentTopology();
TridentState countState =
topology
.newStream("spout", fakeTweets)
.groupBy(new Fields("actor"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
topology
.newDRPCStream("count_per_actor")
.stateQuery(countState, new Fields("args"), new MapGet(), new Fields("count"));
StormSubmitter.submitTopology(topologyName, conf, topology.build());
}
代码示例来源:origin: openimaj/openimaj
public static void main(String[] args) {
final Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(2);
conf.setMaxSpoutPending(1);
conf.setFallBackOnJavaSerialization(false);
conf.setSkipMissingKryoRegistrations(false);
final LocalCluster cluster = new LocalCluster();
final TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("randomSpout1", new RandomFieldSpout(2, 0, 0, 1)); // (nfields,seed,min,max)
builder.setSpout("randomSpout2", new RandomFieldSpout(2, 10, 0, 1)); // (nfields,seed,min,max)
JoinBolt.connectNewBolt(builder);
final StormTopology topology = builder.createTopology();
cluster.submitTopology("playTopology", conf, topology);
Utils.sleep(10000);
cluster.killTopology("playTopology");
cluster.shutdown();
}
}
代码示例来源:origin: org.openimaj.storm/core-storm
public static void main(String[] args) {
final Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(2);
conf.setMaxSpoutPending(1);
conf.setFallBackOnJavaSerialization(false);
conf.setSkipMissingKryoRegistrations(false);
final LocalCluster cluster = new LocalCluster();
final TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("randomSpout1", new RandomFieldSpout(2, 0, 0, 1)); // (nfields,seed,min,max)
builder.setSpout("randomSpout2", new RandomFieldSpout(2, 10, 0, 1)); // (nfields,seed,min,max)
JoinBolt.connectNewBolt(builder);
final StormTopology topology = builder.createTopology();
cluster.submitTopology("playTopology", conf, topology);
Utils.sleep(10000);
cluster.killTopology("playTopology");
cluster.shutdown();
}
}
代码示例来源:origin: calrissian/flowmix
public void run() {
StormTopology topology = new FlowmixBuilder()
.setFlowLoader(new SimpleFlowLoaderSpout(provider.getFlows(), 60000))
.setEventsLoader(new MockEventGeneratorSpout(getMockEvents(), 10))
.setOutputBolt(new PrinterBolt())
.setParallelismHint(6)
.create()
.createTopology();
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
conf.setDebug(false);
conf.registerSerialization(BaseEvent.class, EventSerializer.class);
conf.setSkipMissingKryoRegistrations(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("example-topology", conf, topology);
}
内容来源于网络,如有侵权,请联系作者删除!