本文整理了Java中org.apache.storm.Config.putAll()
方法的一些代码示例,展示了Config.putAll()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Config.putAll()
方法的具体详情如下:
包路径:org.apache.storm.Config
类名称:Config
方法名:putAll
暂无
代码示例来源:origin: apache/storm
/**
* Given a topology definition, return a populated `org.apache.storm.Config` instance.
* @param topologyDef topology definition
* @return a Storm Config object
*/
public static Config buildConfig(TopologyDef topologyDef) {
// merge contents of `config` into topology config
Config conf = new Config();
conf.putAll(topologyDef.getConfig());
return conf;
}
代码示例来源:origin: apache/storm
public static Config loadConf(String resource, Config conf)
throws FileNotFoundException {
Yaml yaml = new Yaml(new SafeConstructor());
Map<String, Object> ret = (Map<String, Object>) yaml.load(new InputStreamReader(
new FileInputStream(resource), Charset.defaultCharset()));
if (ret == null) {
ret = new HashMap<>();
}
// If the config consists of a single key 'config', its values are used
// instead. This means that the same config files can be used with Flux
// and the ConfigurableTopology.
else {
if (ret.size() == 1) {
Object confNode = ret.get("config");
if (confNode != null && confNode instanceof Map) {
ret = (Map<String, Object>) confNode;
}
}
}
conf.putAll(ret);
return conf;
}
代码示例来源:origin: apache/storm
/**
* Initialize a fake config.
* @return conf
*/
private static Config initializedConfig() {
Config conf = new Config();
conf.putAll(Utils.readDefaultConfig());
ArrayList<String> nimbusSeeds = new ArrayList<>();
nimbusSeeds.add(NIMBUS_HOST);
conf.put(Config.NIMBUS_SEEDS, nimbusSeeds);
conf.put(Config.NIMBUS_THRIFT_PORT, NIMBUS_PORT);
return conf;
}
代码示例来源:origin: apache/storm
public static void prepare() {
Config conf = new Config();
conf.putAll(Utils.readStormConfig());
store = Utils.getClientBlobStore(conf);
}
代码示例来源:origin: apache/storm
/**
* Start the topology.
*/
public static void main(String[] args) throws Exception {
int durationSec = -1;
Config topoConf = new Config();
if (args.length > 0) {
durationSec = Integer.parseInt(args[0]);
}
if (args.length > 1) {
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
}
if (args.length > 2) {
System.err.println("args: [runDurationSec] [optionalConfFile]");
return;
}
// Submit to Storm cluster
Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
int runTime = -1;
Config topoConf = new Config();
topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 1);
topoConf.putAll(Utils.readCommandLineOpts());
if (args.length > 0) {
long sleepMs = Integer.parseInt(args[0]);
topoConf.put(SLEEP_MS, sleepMs);
}
if (args.length > 1) {
runTime = Integer.parseInt(args[1]);
}
if (args.length > 2) {
System.err.println("args: boltSleepMs [runDurationSec] ");
return;
}
// Submit topology to storm cluster
Helper.runOnClusterAndPrintMetrics(runTime, "BackPressureTopo", topoConf, getTopology(topoConf));
}
代码示例来源:origin: apache/storm
/**
* ConstSpout only topology (No bolts)
*/
public static void main(String[] args) throws Exception {
int runTime = -1;
Config topoConf = new Config();
if (args.length > 0) {
runTime = Integer.parseInt(args[0]);
}
if (args.length > 1) {
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
}
topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.putAll(Utils.readCommandLineOpts());
if (args.length > 2) {
System.err.println("args: [runDurationSec] [optionalConfFile]");
return;
}
// Submit topology to storm cluster
Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology());
}
}
代码示例来源:origin: apache/storm
private static StormTopology buildExternalTopology(ObjectDef def, ExecutionContext context)
throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException,
InvocationTargetException, NoSuchFieldException {
Object topologySource = buildObject(def, context);
String methodName = context.getTopologyDef().getTopologySource().getMethodName();
Method getTopology = findGetTopologyMethod(topologySource, methodName);
if (getTopology.getParameterTypes()[0].equals(Config.class)) {
Config config = new Config();
config.putAll(context.getTopologyDef().getConfig());
return (StormTopology) getTopology.invoke(topologySource, config);
} else {
return (StormTopology) getTopology.invoke(topologySource, context.getTopologyDef().getConfig());
}
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("args: runDurationSec topConfFile");
return;
}
final Integer durationSec = Integer.parseInt(args[0]);
Config topoConf = new Config();
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.putAll(Utils.readCommandLineOpts());
// Submit to Storm cluster
Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
int runTime = -1;
Config topoConf = new Config();
if (args.length > 0) {
runTime = Integer.parseInt(args[0]);
}
if (args.length > 1) {
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
}
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.putAll(Utils.readCommandLineOpts());
if (args.length > 2) {
System.err.println("args: [runDurationSec] [optionalConfFile]");
return;
}
// Submit topology to storm cluster
Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
int runTime = -1;
Config topoConf = new Config();
if (args.length > 2) {
String file = args[0];
runTime = Integer.parseInt(args[1]);
topoConf.put(INPUT_FILE, file);
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
}
if (args.length > 3 || args.length == 0) {
System.err.println("args: file.txt [runDurationSec] [optionalConfFile]");
return;
}
topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
topoConf.putAll(Utils.readCommandLineOpts());
// Submit topology to storm cluster
Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
代码示例来源:origin: apache/storm
public static Config createClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
Map<String, Map<String, Number>> pools) {
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
config.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, GenSupervisorsDnsToSwitchMapping.class.getName());
config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, compPcore);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, compOffHeap);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, compOnHeap);
if (pools != null) {
config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, pools);
}
return config;
}
代码示例来源:origin: apache/storm
public static TopologyDetails getTopology(String name, Map<String, Object> config, int numSpout, int numBolt,
int spoutParallelism, int boltParallelism, int launchTime, boolean blacklistEnable) {
Config conf = new Config();
conf.putAll(config);
conf.put(Config.TOPOLOGY_NAME, name);
StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
3, genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime, "user");
return topo;
}
代码示例来源:origin: apache/storm
public static TopologyDetails topoToTopologyDetails(String name, Map<String, Object> config, StormTopology topology,
int launchTime, int priority, String user, double maxHeapSize) {
Config conf = new Config();
conf.putAll(config);
conf.put(Config.TOPOLOGY_PRIORITY, priority);
conf.put(Config.TOPOLOGY_NAME, name);
conf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, maxHeapSize);
TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
0, genExecsAndComps(topology), launchTime, user);
return topo;
}
代码示例来源:origin: apache/storm
@Test
public void testSubmitTopologyToLocalNimbus() throws Exception {
int port = Utils.getAvailablePort();
try (ILocalCluster localCluster = new LocalCluster.Builder()
.withNimbusDaemon(true)
.withDaemonConf(Config.NIMBUS_THRIFT_PORT, port)
.build()) {
Config topoConf = new Config();
topoConf.putAll(Utils.readDefaultConfig());
topoConf.setDebug(true);
topoConf.put("storm.cluster.mode", "local"); // default is aways "distributed" but here local cluster is being used.
topoConf.put(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN, InmemoryTopologySubmitterHook.class.getName());
topoConf.put(Config.NIMBUS_THRIFT_PORT, port);
List<TopologyDetails> topologyNames = new ArrayList<>();
for (int i = 0; i < 4; i++) {
final String topologyName = "word-count-" + UUID.randomUUID().toString();
final StormTopology stormTopology = createTestTopology();
topologyNames.add(new TopologyDetails(topologyName, stormTopology));
localCluster.submitTopology(topologyName, topoConf, stormTopology);
}
Assert.assertEquals(InmemoryTopologySubmitterHook.submittedTopologies, topologyNames);
}
}
代码示例来源:origin: apache/storm
@Test
public void sanityTestOfScheduling() {
INimbus iNimbus = new INimbusTest();
Map<String, SupervisorDetails> supMap = genSupervisors(1, 2, 400, 2000);
Config config = new Config();
config.putAll(defaultTopologyConf);
ResourceAwareScheduler rs = new ResourceAwareScheduler();
TopologyDetails topology1 = genTopology("topology1", config, 1, 1, 1, 1, 0, 0, "user");
Topologies topologies = new Topologies(topology1);
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
rs.prepare(config);
rs.schedule(topologies, cluster);
SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId());
Set<WorkerSlot> assignedSlots = assignment.getSlots();
Set<String> nodesIDs = new HashSet<>();
for (WorkerSlot slot : assignedSlots) {
nodesIDs.add(slot.getNodeId());
}
Collection<ExecutorDetails> executors = assignment.getExecutors();
assertEquals(1, assignedSlots.size());
assertEquals(1, nodesIDs.size());
assertEquals(2, executors.size());
assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
}
代码示例来源:origin: apache/storm
@Test
public void TestBadSlot() {
INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, true);
topoMap.put(topo1.getId(), topo1);
Topologies topologies = new Topologies(topoMap);
StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
bs.prepare(config);
bs.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
bs.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
bs.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
bs.schedule(topologies, cluster);
Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts());
}
代码示例来源:origin: apache/storm
config.putAll(Utils.readDefaultConfig());
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
代码示例来源:origin: apache/storm
config.putAll(Utils.readDefaultConfig());
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
代码示例来源:origin: apache/storm
@Test
public void TestBadSupervisor() {
INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, true);
topoMap.put(topo1.getId(), topo1);
Topologies topologies = new Topologies(topoMap);
StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<>(), topologies, config);
BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
bs.prepare(config);
bs.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
bs.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
bs.schedule(topologies, cluster);
cluster = new Cluster(iNimbus, resourceMetrics, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
bs.schedule(topologies, cluster);
Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts());
}
内容来源于网络,如有侵权,请联系作者删除!