org.apache.storm.Config.putAll()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(15.3k)|赞(0)|评价(0)|浏览(135)

本文整理了Java中org.apache.storm.Config.putAll()方法的一些代码示例,展示了Config.putAll()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Config.putAll()方法的具体详情如下:
包路径:org.apache.storm.Config
类名称:Config
方法名:putAll

Config.putAll介绍

暂无

代码示例

代码示例来源:origin: apache/storm

/**
 * Given a topology definition, return a populated `org.apache.storm.Config` instance.
 * @param topologyDef topology definition
 * @return a Storm Config object
 */
public static Config buildConfig(TopologyDef topologyDef) {
  // merge contents of `config` into topology config
  Config conf = new Config();
  conf.putAll(topologyDef.getConfig());
  return conf;
}

代码示例来源:origin: apache/storm

public static Config loadConf(String resource, Config conf)
  throws FileNotFoundException {
  Yaml yaml = new Yaml(new SafeConstructor());
  Map<String, Object> ret = (Map<String, Object>) yaml.load(new InputStreamReader(
    new FileInputStream(resource), Charset.defaultCharset()));
  if (ret == null) {
    ret = new HashMap<>();
  }
  // If the config consists of a single key 'config', its values are used
  // instead. This means that the same config files can be used with Flux
  // and the ConfigurableTopology.
  else {
    if (ret.size() == 1) {
      Object confNode = ret.get("config");
      if (confNode != null && confNode instanceof Map) {
        ret = (Map<String, Object>) confNode;
      }
    }
  }
  conf.putAll(ret);
  return conf;
}

代码示例来源:origin: apache/storm

/**
 * Initialize a fake config.
 * @return conf
 */
private static Config initializedConfig() {
  Config conf = new Config();
  conf.putAll(Utils.readDefaultConfig());
  ArrayList<String> nimbusSeeds = new ArrayList<>();
  nimbusSeeds.add(NIMBUS_HOST);
  conf.put(Config.NIMBUS_SEEDS, nimbusSeeds);
  conf.put(Config.NIMBUS_THRIFT_PORT, NIMBUS_PORT);
  return conf;
}

代码示例来源:origin: apache/storm

public static void prepare() {
  Config conf = new Config();
  conf.putAll(Utils.readStormConfig());
  store = Utils.getClientBlobStore(conf);
}

代码示例来源:origin: apache/storm

/**
 * Start the topology.
 */
public static void main(String[] args) throws Exception {
  int durationSec = -1;
  Config topoConf = new Config();
  if (args.length > 0) {
    durationSec = Integer.parseInt(args[0]);
  }
  if (args.length > 1) {
    topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
  }
  if (args.length > 2) {
    System.err.println("args: [runDurationSec]  [optionalConfFile]");
    return;
  }
  //  Submit to Storm cluster
  Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
  int runTime = -1;
  Config topoConf = new Config();
  topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 1);
  topoConf.putAll(Utils.readCommandLineOpts());
  if (args.length > 0) {
    long sleepMs = Integer.parseInt(args[0]);
    topoConf.put(SLEEP_MS, sleepMs);
  }
  if (args.length > 1) {
    runTime = Integer.parseInt(args[1]);
  }
  if (args.length > 2) {
    System.err.println("args: boltSleepMs [runDurationSec] ");
    return;
  }
  //  Submit topology to storm cluster
  Helper.runOnClusterAndPrintMetrics(runTime, "BackPressureTopo", topoConf, getTopology(topoConf));
}

代码示例来源:origin: apache/storm

/**
   * ConstSpout only topology  (No bolts)
   */
  public static void main(String[] args) throws Exception {
    int runTime = -1;
    Config topoConf = new Config();
    if (args.length > 0) {
      runTime = Integer.parseInt(args[0]);
    }
    if (args.length > 1) {
      topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
    }
    topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
    topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
    topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
    topoConf.putAll(Utils.readCommandLineOpts());
    if (args.length > 2) {
      System.err.println("args: [runDurationSec]  [optionalConfFile]");
      return;
    }
    //  Submit topology to storm cluster
    Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology());
  }
}

代码示例来源:origin: apache/storm

private static StormTopology buildExternalTopology(ObjectDef def, ExecutionContext context)
    throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException,
    InvocationTargetException, NoSuchFieldException {
  Object topologySource = buildObject(def, context);
  String methodName = context.getTopologyDef().getTopologySource().getMethodName();
  Method getTopology = findGetTopologyMethod(topologySource, methodName);
  if (getTopology.getParameterTypes()[0].equals(Config.class)) {
    Config config = new Config();
    config.putAll(context.getTopologyDef().getConfig());
    return (StormTopology) getTopology.invoke(topologySource, config);
  } else {
    return (StormTopology) getTopology.invoke(topologySource, context.getTopologyDef().getConfig());
  }
}

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("args: runDurationSec topConfFile");
      return;
    }

    final Integer durationSec = Integer.parseInt(args[0]);
    Config topoConf = new Config();
    topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
    topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
    topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
    topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
    topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
    topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);

    topoConf.putAll(Utils.readCommandLineOpts());
    // Submit to Storm cluster
    Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
  }
}

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
    int runTime = -1;
    Config topoConf = new Config();
    if (args.length > 0) {
      runTime = Integer.parseInt(args[0]);
    }
    if (args.length > 1) {
      topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
    }
    topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
    topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
    topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
    topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
    topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);

    topoConf.putAll(Utils.readCommandLineOpts());
    if (args.length > 2) {
      System.err.println("args: [runDurationSec]  [optionalConfFile]");
      return;
    }
    //  Submit topology to storm cluster
    Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
  }
}

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
    int runTime = -1;
    Config topoConf = new Config();
    if (args.length > 2) {
      String file = args[0];
      runTime = Integer.parseInt(args[1]);
      topoConf.put(INPUT_FILE, file);
      topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
    }
    if (args.length > 3 || args.length == 0) {
      System.err.println("args: file.txt [runDurationSec]  [optionalConfFile]");
      return;
    }
    topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
    topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
    topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
    topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
    topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
    topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);

    topoConf.putAll(Utils.readCommandLineOpts());
    //  Submit topology to storm cluster
    Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
  }
}

代码示例来源:origin: apache/storm

public static Config createClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
                     Map<String, Map<String, Number>> pools) {
  Config config = new Config();
  config.putAll(Utils.readDefaultConfig());
  config.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, GenSupervisorsDnsToSwitchMapping.class.getName());
  config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
  config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
  config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, compPcore);
  config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, compOffHeap);
  config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, compOnHeap);
  if (pools != null) {
    config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, pools);
  }
  return config;
}

代码示例来源:origin: apache/storm

public static TopologyDetails getTopology(String name, Map<String, Object> config, int numSpout, int numBolt,
                     int spoutParallelism, int boltParallelism, int launchTime, boolean blacklistEnable) {
  Config conf = new Config();
  conf.putAll(config);
  conf.put(Config.TOPOLOGY_NAME, name);
  StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
  TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
      3, genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime, "user");
  return topo;
}

代码示例来源:origin: apache/storm

public static TopologyDetails topoToTopologyDetails(String name, Map<String, Object> config, StormTopology topology,
                          int launchTime, int priority, String user, double maxHeapSize) {
  Config conf = new Config();
  conf.putAll(config);
  conf.put(Config.TOPOLOGY_PRIORITY, priority);
  conf.put(Config.TOPOLOGY_NAME, name);
  conf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
  conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, maxHeapSize);
  TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
    0, genExecsAndComps(topology), launchTime, user);
  return topo;
}

代码示例来源:origin: apache/storm

@Test
public void testSubmitTopologyToLocalNimbus() throws Exception {
  int port = Utils.getAvailablePort();
  try (ILocalCluster localCluster = new LocalCluster.Builder()
    .withNimbusDaemon(true)
    .withDaemonConf(Config.NIMBUS_THRIFT_PORT, port)
    .build()) {
    Config topoConf = new Config();
    topoConf.putAll(Utils.readDefaultConfig());
    topoConf.setDebug(true);
    topoConf.put("storm.cluster.mode", "local"); // default is aways "distributed" but here local cluster is being used.
    topoConf.put(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN, InmemoryTopologySubmitterHook.class.getName());
    topoConf.put(Config.NIMBUS_THRIFT_PORT, port);
    List<TopologyDetails> topologyNames = new ArrayList<>();
    for (int i = 0; i < 4; i++) {
      final String topologyName = "word-count-" + UUID.randomUUID().toString();
      final StormTopology stormTopology = createTestTopology();
      topologyNames.add(new TopologyDetails(topologyName, stormTopology));
      localCluster.submitTopology(topologyName, topoConf, stormTopology);
    }
    Assert.assertEquals(InmemoryTopologySubmitterHook.submittedTopologies, topologyNames);
  }
}

代码示例来源:origin: apache/storm

@Test
public void sanityTestOfScheduling() {
  INimbus iNimbus = new INimbusTest();
  Map<String, SupervisorDetails> supMap = genSupervisors(1, 2, 400, 2000);
  Config config = new Config();
  config.putAll(defaultTopologyConf);
  ResourceAwareScheduler rs = new ResourceAwareScheduler();
  TopologyDetails topology1 = genTopology("topology1", config, 1, 1, 1, 1, 0, 0, "user");
  Topologies topologies = new Topologies(topology1);
  Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
  rs.prepare(config);
  rs.schedule(topologies, cluster);
  SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId());
  Set<WorkerSlot> assignedSlots = assignment.getSlots();
  Set<String> nodesIDs = new HashSet<>();
  for (WorkerSlot slot : assignedSlots) {
    nodesIDs.add(slot.getNodeId());
  }
  Collection<ExecutorDetails> executors = assignment.getExecutors();
  assertEquals(1, assignedSlots.size());
  assertEquals(1, nodesIDs.size());
  assertEquals(2, executors.size());
  assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
}

代码示例来源:origin: apache/storm

@Test
public void TestBadSlot() {
  INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
  Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
  Config config = new Config();
  config.putAll(Utils.readDefaultConfig());
  config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
  config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
  config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
  Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
  TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, true);
  topoMap.put(topo1.getId(), topo1);
  Topologies topologies = new Topologies(topoMap);
  StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
  ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
  Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
  BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
  bs.prepare(config);
  bs.schedule(topologies, cluster);
  cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
  bs.schedule(topologies, cluster);
  cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removePortFromSupervisors(supMap, "sup-0", 0), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
  bs.schedule(topologies, cluster);
  cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
  bs.schedule(topologies, cluster);
  Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts());
}

代码示例来源:origin: apache/storm

config.putAll(Utils.readDefaultConfig());
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);

代码示例来源:origin: apache/storm

config.putAll(Utils.readDefaultConfig());
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);

代码示例来源:origin: apache/storm

@Test
public void TestBadSupervisor() {
  INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
  Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
  Config config = new Config();
  config.putAll(Utils.readDefaultConfig());
  config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
  config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
  config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
  Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
  TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, true);
  topoMap.put(topo1.getId(), topo1);
  Topologies topologies = new Topologies(topoMap);
  StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
  ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
  Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<>(), topologies, config);
  BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
  bs.prepare(config);
  bs.schedule(topologies, cluster);
  cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
  bs.schedule(topologies, cluster);
  cluster = new Cluster(iNimbus, resourceMetrics, TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0"), TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
  bs.schedule(topologies, cluster);
  cluster = new Cluster(iNimbus, resourceMetrics, supMap, TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
  bs.schedule(topologies, cluster);
  Assert.assertEquals("blacklist", Collections.singleton("host-0"), cluster.getBlacklistedHosts());
}

相关文章