org.apache.storm.Config.setDebug()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(11.4k)|赞(0)|评价(0)|浏览(167)

本文整理了Java中org.apache.storm.Config.setDebug()方法的一些代码示例,展示了Config.setDebug()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Config.setDebug()方法的具体详情如下:
包路径:org.apache.storm.Config
类名称:Config
方法名:setDebug

Config.setDebug介绍

暂无

代码示例

代码示例来源:origin: apache/storm

public void setDebug(boolean isOn) {
  setDebug(this, isOn);
}

代码示例来源:origin: apache/storm

private Config newConfig() {
  Config config = new Config();
  config.setDebug(true);
  return config;
}

代码示例来源:origin: apache/storm

protected Config getConfig() {
  Config config = new Config();
  config.setDebug(true);
  return config;
}

代码示例来源:origin: apache/storm

protected Config getConfig() {
  Config config = new Config();
  config.setDebug(true);
  return config;
}

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
 StormTopology topology = getStormTopology();
 Config conf = new Config();
 conf.setDebug(true);
 String topoName = "test";
 if (args.length > 0) {
  topoName = args[0];
 }
 conf.setNumWorkers(3);
 StormSubmitter.submitTopologyWithProgressBar(topoName, conf, topology);
}

代码示例来源:origin: apache/storm

@Override
  protected int run(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    // example. spout1: generate random strings
    // bolt1: get the first part of a string
    // bolt2: output the tuple

    // NOTE: Variable used in lambda expression should be final or effectively final
    // (or it will cause compilation error),
    // and variable type should implement the Serializable interface if it isn't primitive type
    // (or it will cause not serializable exception).
    Prefix prefix = new Prefix("Hello lambda:");
    String suffix = ":so cool!";
    int tag = 999;

    builder.setSpout("spout1", () -> UUID.randomUUID().toString());
    builder.setBolt("bolt1", (tuple, collector) -> {
      String[] parts = tuple.getStringByField("lambda").split("\\-");
      collector.emit(new Values(prefix + parts[0] + suffix, tag));
    }, "strValue", "intValue").shuffleGrouping("spout1");
    builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1");

    Config conf = new Config();
    conf.setDebug(true);
    conf.setNumWorkers(2);

    return submit("lambda-demo", conf, builder);
  }
}

代码示例来源:origin: apache/storm

protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException,
    AuthorizationException, InterruptedException {
    final String brokerUrl = args.length > 0 ? args[0] : KAFKA_LOCAL_BROKER;
    final boolean isOpaque = args.length > 1 ? Boolean.parseBoolean(args[1]) : true;
    System.out.println("Running with broker url " + brokerUrl + " and isOpaque=" + isOpaque);

    Config tpConf = new Config();
    tpConf.setDebug(true);
    tpConf.setMaxSpoutPending(5);

    // Producers
    StormSubmitter.submitTopology(TOPIC_1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
    StormSubmitter.submitTopology(TOPIC_2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
    // Consumer
    KafkaTridentSpoutConfig<String, String> spoutConfig = newKafkaSpoutConfig(brokerUrl);
    ITridentDataSource spout = isOpaque ? newKafkaTridentSpoutOpaque(spoutConfig) : newKafkaTridentSpoutTransactional(spoutConfig);
    StormSubmitter.submitTopology("topics-consumer", tpConf,
      TridentKafkaConsumerTopology.newTopology(spout));
  }
}

代码示例来源:origin: apache/storm

conf.setDebug(true);
String topoName = "word-count";
if (args.length > 1) {

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("word", new TestWordSpout(), 10);
  builder.setBolt("exclaim1", new ExclamationLoggingBolt(), 3).shuffleGrouping("word");
  builder.setBolt("exclaim2", new ExclamationLoggingBolt(), 2).shuffleGrouping("exclaim1");
  Config conf = new Config();
  conf.setDebug(true);
  String topoName = MultipleLoggerTopology.class.getName();
  if (args != null && args.length > 0) {
    topoName = args[0];
  }
  conf.setNumWorkers(2);
  StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}

代码示例来源:origin: apache/storm

public void buildAndLaunchWordCountTopology(String[] args) {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("spout", new RandomSentenceSpout(), 5);
  builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
  builder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split");
  Config conf = new Config();
  conf.setDebug(true);
  try {
    conf.setNumWorkers(3);
    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
  } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) {
    throw new RuntimeException(exp);
  }
}

代码示例来源:origin: apache/storm

public static void runStormTopology(LocalCluster cluster, final List<?> watchedList, final int expectedValueSize,
                  AbstractStreamsProcessor proc, StormTopology topo) throws Exception {
  final Config conf = new Config();
  conf.setMaxSpoutPending(20);
  conf.setDebug(true);
  if (proc.getClassLoaders() != null && proc.getClassLoaders().size() > 0) {
    CompilingClassLoader lastClassloader = proc.getClassLoaders().get(proc.getClassLoaders().size() - 1);
    Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
  }
  try (LocalCluster.LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo)) {
    waitForCompletion(1000 * 1000, () -> watchedList.size() < expectedValueSize);
  } finally {
    while (cluster.getClusterInfo().get_topologies_size() > 0) {
      Thread.sleep(10);
    }
    Utils.resetClassLoaderForJavaDeSerialize();
  }
}

代码示例来源:origin: apache/storm

protected int run(String[] args) {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("word", new TestWordSpout(), 10);
  builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
  builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
  conf.setDebug(true);
  String topologyName = "test";
  conf.setNumWorkers(3);
  if (args != null && args.length > 0) {
    topologyName = args[0];
  }
  return submit(topologyName, conf, builder);
}

代码示例来源:origin: apache/storm

protected int run(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("spout", new RandomSentenceSpout(), 5);
  builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
  builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
  conf.setDebug(true);
  String topologyName = "word-count";
  conf.setNumWorkers(3);
  if (args != null && args.length > 0) {
    topologyName = args[0];
  }
  return submit(topologyName, conf, builder);
}

代码示例来源:origin: apache/storm

@Test
public void testSubmitTopologyToLocalNimbus() throws Exception {
  int port = Utils.getAvailablePort();
  try (ILocalCluster localCluster = new LocalCluster.Builder()
    .withNimbusDaemon(true)
    .withDaemonConf(Config.NIMBUS_THRIFT_PORT, port)
    .build()) {
    Config topoConf = new Config();
    topoConf.putAll(Utils.readDefaultConfig());
    topoConf.setDebug(true);
    topoConf.put("storm.cluster.mode", "local"); // default is aways "distributed" but here local cluster is being used.
    topoConf.put(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN, InmemoryTopologySubmitterHook.class.getName());
    topoConf.put(Config.NIMBUS_THRIFT_PORT, port);
    List<TopologyDetails> topologyNames = new ArrayList<>();
    for (int i = 0; i < 4; i++) {
      final String topologyName = "word-count-" + UUID.randomUUID().toString();
      final StormTopology stormTopology = createTestTopology();
      topologyNames.add(new TopologyDetails(topologyName, stormTopology));
      localCluster.submitTopology(topologyName, topoConf, stormTopology);
    }
    Assert.assertEquals(InmemoryTopologySubmitterHook.submittedTopologies, topologyNames);
  }
}

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("spout", new RandomSentence(), 5);
  builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
  builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
  Config conf = new Config();
  conf.setDebug(true);
  String topoName = "word-count";
  if (args != null && args.length > 0) {
    topoName = args[0];
  }
  conf.setNumWorkers(3);
  StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("spout", new RandomIntegerSpout());
  builder.setBolt("partialsum", new StatefulSumBolt("partial"), 1).shuffleGrouping("spout");
  builder.setBolt("printer", new PrinterBolt(), 2).shuffleGrouping("partialsum");
  builder.setBolt("total", new StatefulSumBolt("total"), 1).shuffleGrouping("printer");
  Config conf = new Config();
  conf.setDebug(false);
  String topoName = "test";
  if (args != null && args.length > 0) {
    topoName = args[0];
  }
  conf.setNumWorkers(1);
  StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("spout", new RandomIntegerSpout());
  builder.setBolt("sumbolt", new WindowSumBolt().withWindow(new Count(5), new Count(3))
                         .withMessageIdField("msgid"), 1).shuffleGrouping("spout");
  builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("sumbolt");
  Config conf = new Config();
  conf.setDebug(false);
  //conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
  String topoName = "test";
  if (args != null && args.length > 0) {
    topoName = args[0];
  }
  conf.setNumWorkers(1);
  StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}

代码示例来源:origin: apache/storm

conf.setDebug(false);

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    BaseWindowedBolt bolt = new SlidingWindowSumBolt()
      .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS))
      .withTimestampField("ts")
      .withLag(new Duration(5, TimeUnit.SECONDS));
    builder.setSpout("integer", new RandomIntegerSpout(), 1);
    builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer");
    builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum");
    Config conf = new Config();
    conf.setDebug(true);
    String topoName = "test";

    if (args != null && args.length > 0) {
      topoName = args[0];
    }

    conf.setNumWorkers(1);
    StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
  }
}

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("integer", new RandomIntegerSpout(), 1);
  builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(Count.of(30), Count.of(10)), 1)
      .shuffleGrouping("integer");
  builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(Count.of(3)), 1)
      .shuffleGrouping("slidingsum");
  builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
  Config conf = new Config();
  conf.setDebug(true);
  String topoName = "test";
  if (args != null && args.length > 0) {
    topoName = args[0];
  }
  conf.setNumWorkers(1);
  StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}

相关文章