本文整理了Java中org.apache.storm.Config.put()
方法的一些代码示例,展示了Config.put()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Config.put()
方法的具体详情如下:
包路径:org.apache.storm.Config
类名称:Config
方法名:put
暂无
代码示例来源:origin: apache/storm
/**
* Set the max heap size allow per worker for this topology.
*
* @param size the maximum heap size for a worker.
*/
public void setTopologyWorkerMaxHeapSize(Number size) {
if (size != null) {
this.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, size);
}
}
代码示例来源:origin: apache/storm
/**
* Initialize a fake config.
* @return conf
*/
private static Config initializedConfig() {
Config conf = new Config();
conf.putAll(Utils.readDefaultConfig());
ArrayList<String> nimbusSeeds = new ArrayList<>();
nimbusSeeds.add(NIMBUS_HOST);
conf.put(Config.NIMBUS_SEEDS, nimbusSeeds);
conf.put(Config.NIMBUS_THRIFT_PORT, NIMBUS_PORT);
return conf;
}
代码示例来源:origin: apache/storm
/**
* Sets the maximum number of states that will be searched in the constraint solver strategy.
*
* @param numStates maximum number of stats to search.
*/
public void setTopologyConstraintsMaxStateSearch(int numStates) {
this.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, numStates);
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("args: runDurationSec topConfFile");
return;
}
final Integer durationSec = Integer.parseInt(args[0]);
Config topoConf = new Config();
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.putAll(Utils.readCommandLineOpts());
// Submit to Storm cluster
Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
代码示例来源:origin: apache/storm
/**
* Set the priority for a topology.
*
* @param priority
*/
public void setTopologyPriority(int priority) {
this.put(Config.TOPOLOGY_PRIORITY, priority);
}
代码示例来源:origin: apache/storm
public static Config createClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
Map<String, Map<String, Number>> pools) {
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
config.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, GenSupervisorsDnsToSwitchMapping.class.getName());
config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, compPcore);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, compOffHeap);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, compOnHeap);
if (pools != null) {
config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, pools);
}
return config;
}
代码示例来源:origin: apache/storm
public void setTopologyStrategy(String strategy) {
this.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, strategy);
}
代码示例来源:origin: apache/storm
public static TopologyDetails topoToTopologyDetails(String name, Map<String, Object> config, StormTopology topology,
int launchTime, int priority, String user, double maxHeapSize) {
Config conf = new Config();
conf.putAll(config);
conf.put(Config.TOPOLOGY_PRIORITY, priority);
conf.put(Config.TOPOLOGY_NAME, name);
conf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, maxHeapSize);
TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
0, genExecsAndComps(topology), launchTime, user);
return topo;
}
代码示例来源:origin: apache/storm
/**
* {@inheritDoc}
*/
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
// add tick tuple each second to force acknowledgement of pending tuples.
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
return conf;
}
代码示例来源:origin: apache/storm
public static TopologyDetails getTopology(String name, Map<String, Object> config, int numSpout, int numBolt,
int spoutParallelism, int boltParallelism, int launchTime, boolean blacklistEnable) {
Config conf = new Config();
conf.putAll(config);
conf.put(Config.TOPOLOGY_NAME, name);
StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
3, genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime, "user");
return topo;
}
代码示例来源:origin: apache/storm
/**
* {@inheritDoc}
*/
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
return conf;
}
代码示例来源:origin: apache/storm
@BeforeAll
public static void initConf() {
defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0);
defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
}
代码示例来源:origin: apache/storm
public static Config createGrasClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
Map<String, Map<String, Number>> pools, Map<String, Double> genericResourceMap) {
Config config = createClusterConfig(compPcore, compOnHeap, compOffHeap, pools);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, genericResourceMap);
config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
return config;
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 100);
// window-state table should already be created with cf:tuples column
HBaseWindowsStoreFactory windowStoreFactory =
new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
String topoName = "wordCounterWithWindowing";
if (args.length > 0) {
topoName = args[0];
}
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology(windowStoreFactory));
}
代码示例来源:origin: apache/storm
@Test
public void testHeterogeneousClusterwithGras() {
Config grasClusterConfig = (Config) defaultTopologyConf.clone();
grasClusterConfig.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
testHeterogeneousCluster(grasClusterConfig, GenericResourceAwareStrategy.class.getSimpleName());
}
代码示例来源:origin: apache/storm
@Test
public void testMalformedYaml() throws Exception {
File temp = File.createTempFile("FileLoader", ".yaml");
temp.deleteOnExit();
FileWriter fw = new FileWriter(temp);
String outputData = "ThisIsNotValidYaml";
fw.write(outputData, 0, outputData.length());
fw.flush();
fw.close();
Config conf = new Config();
conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, FILE_SCHEME_PREFIX + temp.getCanonicalPath());
FileConfigLoader testLoader = new FileConfigLoader(conf);
Map<String, Object> result = testLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
Assert.assertNull("Unexpectedly returned a map", result);
}
代码示例来源:origin: apache/storm
@Test
public void testMalformedYaml() throws Exception {
// This is a test where we are configured to point right at a single artifact
Config conf = new Config();
conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI,
ARTIFACTORY_HTTP_SCHEME_PREFIX + "bogushost.yahoo.com:9999/location/of/this/artifact");
conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString());
ArtifactoryConfigLoaderMock loaderMock = new ArtifactoryConfigLoaderMock(conf);
loaderMock.setData("Anything", "/location/of/this/artifact", "{ \"downloadUri\": \"anything\"}");
loaderMock.setData(null, null, "ThisIsNotValidYaml");
Map<String, Object> ret = loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
Assert.assertNull("Unexpectedly returned a map", ret);
}
代码示例来源:origin: apache/storm
@Test
public void testFileNotThere() {
Config conf = new Config();
conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, FILE_SCHEME_PREFIX + "/file/not/exist/");
FileConfigLoader testLoader = new FileConfigLoader(conf);
Map<String, Object> result = testLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
Assert.assertNull("Unexpectedly returned a map", result);
}
代码示例来源:origin: apache/storm
@Test
@IntegrationTest
public void testCompleteTopologyNettySimulated() throws Exception {
Config daemonConf = new Config();
daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
MkClusterParam param = new MkClusterParam();
param.setSupervisors(4);
param.setDaemonConf(daemonConf);
Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}
代码示例来源:origin: apache/storm
@Test
@IntegrationTest
public void testCompleteTopologyNetty() throws Exception {
Config daemonConf = new Config();
daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
MkClusterParam param = new MkClusterParam();
param.setSupervisors(4);
param.setDaemonConf(daemonConf);
Testing.withLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}
内容来源于网络,如有侵权,请联系作者删除!