本文整理了Java中org.apache.storm.Config.<init>()
方法的一些代码示例,展示了Config.<init>()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Config.<init>()
方法的具体详情如下:
包路径:org.apache.storm.Config
类名称:Config
方法名:<init>
暂无
代码示例来源:origin: apache/storm
/**
* {@inheritDoc}
*/
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
// add tick tuple each second to force acknowledgement of pending tuples.
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
return conf;
}
代码示例来源:origin: apache/storm
/**
* {@inheritDoc}
*/
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
return conf;
}
代码示例来源:origin: apache/storm
private Config newConfig() {
Config config = new Config();
config.setDebug(true);
return config;
}
代码示例来源:origin: apache/storm
@Override
public Map<String, Object> getComponentConfiguration() {
Config ret = new Config();
ret.setMaxTaskParallelism(1);
return ret;
}
}
代码示例来源:origin: apache/storm
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
代码示例来源:origin: apache/storm
/**
* Given a topology definition, return a populated `org.apache.storm.Config` instance.
* @param topologyDef topology definition
* @return a Storm Config object
*/
public static Config buildConfig(TopologyDef topologyDef) {
// merge contents of `config` into topology config
Config conf = new Config();
conf.putAll(topologyDef.getConfig());
return conf;
}
代码示例来源:origin: apache/storm
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = super.getComponentConfiguration();
if (conf == null) {
conf = new Config();
}
if (options.getTickTupleInterval() > 0) {
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, options.getTickTupleInterval());
}
return conf;
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
Config conf = new Config();
StormSubmitter.submitTopology("reach", conf, buildTopology());
try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
Thread.sleep(2000);
System.out.println("REACH: " + drpc.execute("reach", "aaa"));
System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));
System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));
}
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
StormTopology topology = buildVehiclesTopology();
Config conf = new Config();
conf.setMaxSpoutPending(20);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar("vehicles-topology", conf, topology);
}
代码示例来源:origin: apache/storm
private Tuple generateTestTuple(Object id, Object msg, Object city, Object state) {
TopologyBuilder builder = new TopologyBuilder();
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields("id", "msg", "city", "state");
}
};
return new TupleImpl(topologyContext, new Values(id, msg, city, state), "", 1, "");
}
代码示例来源:origin: apache/storm
public Cluster makeCluster(TopologyDetails topo) {
Topologies topologies = new Topologies(topo);
Map<String, SupervisorDetails> supMap = genSupervisors(4, 2, 120, 1200);
return new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, new Config());
}
代码示例来源:origin: apache/storm
@Test
public void testInvalidConfig() {
Config conf = new Config();
ArtifactoryConfigLoaderMock loaderMock = new ArtifactoryConfigLoaderMock(conf);
Map<String, Object> ret = loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
Assert.assertNull("Unexpectedly returned not null", ret);
}
代码示例来源:origin: apache/storm
public static TopologyDetails getTopology(String name, Map<String, Object> config, int numSpout, int numBolt,
int spoutParallelism, int boltParallelism, int launchTime, boolean blacklistEnable) {
Config conf = new Config();
conf.putAll(config);
conf.put(Config.TOPOLOGY_NAME, name);
StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
3, genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime, "user");
return topo;
}
代码示例来源:origin: apache/storm
@Test
public void testTwoTuplesTwoFiles() throws IOException {
HdfsBolt bolt = makeHdfsBolt(hdfsURI, 1, .00001f);
bolt.prepare(new Config(), topologyContext, collector);
bolt.execute(tuple1);
bolt.execute(tuple2);
verify(collector).ack(tuple1);
verify(collector).ack(tuple2);
Assert.assertEquals(2, countNonZeroLengthFiles(testRoot));
}
代码示例来源:origin: apache/storm
@Test
public void testFileNotThere() {
Config conf = new Config();
conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, FILE_SCHEME_PREFIX + "/file/not/exist/");
FileConfigLoader testLoader = new FileConfigLoader(conf);
Map<String, Object> result = testLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
Assert.assertNull("Unexpectedly returned a map", result);
}
代码示例来源:origin: apache/storm
@Test
public void testFailedSync() throws IOException {
HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f);
bolt.prepare(new Config(), topologyContext, collector);
bolt.execute(tuple1);
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
// All writes/syncs will fail so this should cause a RuntimeException
thrown.expect(RuntimeException.class);
bolt.execute(tuple1);
}
代码示例来源:origin: apache/storm
@Test
@IntegrationTest
public void testCompleteTopologyNettySimulated() throws Exception {
Config daemonConf = new Config();
daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
MkClusterParam param = new MkClusterParam();
param.setSupervisors(4);
param.setDaemonConf(daemonConf);
Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}
代码示例来源:origin: apache/storm
@Test
@IntegrationTest
public void testCompleteTopologyNetty() throws Exception {
Config daemonConf = new Config();
daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
MkClusterParam param = new MkClusterParam();
param.setSupervisors(4);
param.setDaemonConf(daemonConf);
Testing.withLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}
代码示例来源:origin: apache/storm
public static TopologyDetails topoToTopologyDetails(String name, Map<String, Object> config, StormTopology topology,
int launchTime, int priority, String user, double maxHeapSize) {
Config conf = new Config();
conf.putAll(config);
conf.put(Config.TOPOLOGY_PRIORITY, priority);
conf.put(Config.TOPOLOGY_NAME, name);
conf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, maxHeapSize);
TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
0, genExecsAndComps(topology), launchTime, user);
return topo;
}
代码示例来源:origin: apache/storm
@Test
public void multipleTuplesMutliplesFiles() throws IOException {
AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, .0001f, schemaV1);
bolt.prepare(new Config(), topologyContext, collector);
bolt.execute(tuple1);
bolt.execute(tuple1);
bolt.execute(tuple1);
bolt.execute(tuple1);
Assert.assertEquals(4, countNonZeroLengthFiles(testRoot));
verifyAllAvroFiles(testRoot);
}
内容来源于网络,如有侵权,请联系作者删除!