本文整理了Java中org.apache.storm.Config.setDebug()
方法的一些代码示例,展示了Config.setDebug()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Config.setDebug()
方法的具体详情如下:
包路径:org.apache.storm.Config
类名称:Config
方法名:setDebug
暂无
代码示例来源:origin: apache/storm
public void setDebug(boolean isOn) {
setDebug(this, isOn);
}
代码示例来源:origin: apache/storm
private Config newConfig() {
Config config = new Config();
config.setDebug(true);
return config;
}
代码示例来源:origin: apache/storm
protected Config getConfig() {
Config config = new Config();
config.setDebug(true);
return config;
}
代码示例来源:origin: apache/storm
protected Config getConfig() {
Config config = new Config();
config.setDebug(true);
return config;
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
StormTopology topology = getStormTopology();
Config conf = new Config();
conf.setDebug(true);
String topoName = "test";
if (args.length > 0) {
topoName = args[0];
}
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, topology);
}
代码示例来源:origin: apache/storm
@Override
protected int run(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// example. spout1: generate random strings
// bolt1: get the first part of a string
// bolt2: output the tuple
// NOTE: Variable used in lambda expression should be final or effectively final
// (or it will cause compilation error),
// and variable type should implement the Serializable interface if it isn't primitive type
// (or it will cause not serializable exception).
Prefix prefix = new Prefix("Hello lambda:");
String suffix = ":so cool!";
int tag = 999;
builder.setSpout("spout1", () -> UUID.randomUUID().toString());
builder.setBolt("bolt1", (tuple, collector) -> {
String[] parts = tuple.getStringByField("lambda").split("\\-");
collector.emit(new Values(prefix + parts[0] + suffix, tag));
}, "strValue", "intValue").shuffleGrouping("spout1");
builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
return submit("lambda-demo", conf, builder);
}
}
代码示例来源:origin: apache/storm
protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException,
AuthorizationException, InterruptedException {
final String brokerUrl = args.length > 0 ? args[0] : KAFKA_LOCAL_BROKER;
final boolean isOpaque = args.length > 1 ? Boolean.parseBoolean(args[1]) : true;
System.out.println("Running with broker url " + brokerUrl + " and isOpaque=" + isOpaque);
Config tpConf = new Config();
tpConf.setDebug(true);
tpConf.setMaxSpoutPending(5);
// Producers
StormSubmitter.submitTopology(TOPIC_1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
StormSubmitter.submitTopology(TOPIC_2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
// Consumer
KafkaTridentSpoutConfig<String, String> spoutConfig = newKafkaSpoutConfig(brokerUrl);
ITridentDataSource spout = isOpaque ? newKafkaTridentSpoutOpaque(spoutConfig) : newKafkaTridentSpoutTransactional(spoutConfig);
StormSubmitter.submitTopology("topics-consumer", tpConf,
TridentKafkaConsumerTopology.newTopology(spout));
}
}
代码示例来源:origin: apache/storm
conf.setDebug(true);
String topoName = "word-count";
if (args.length > 1) {
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationLoggingBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationLoggingBolt(), 2).shuffleGrouping("exclaim1");
Config conf = new Config();
conf.setDebug(true);
String topoName = MultipleLoggerTopology.class.getName();
if (args != null && args.length > 0) {
topoName = args[0];
}
conf.setNumWorkers(2);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
代码示例来源:origin: apache/storm
public void buildAndLaunchWordCountTopology(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split");
Config conf = new Config();
conf.setDebug(true);
try {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) {
throw new RuntimeException(exp);
}
}
代码示例来源:origin: apache/storm
public static void runStormTopology(LocalCluster cluster, final List<?> watchedList, final int expectedValueSize,
AbstractStreamsProcessor proc, StormTopology topo) throws Exception {
final Config conf = new Config();
conf.setMaxSpoutPending(20);
conf.setDebug(true);
if (proc.getClassLoaders() != null && proc.getClassLoaders().size() > 0) {
CompilingClassLoader lastClassloader = proc.getClassLoaders().get(proc.getClassLoaders().size() - 1);
Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
}
try (LocalCluster.LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo)) {
waitForCompletion(1000 * 1000, () -> watchedList.size() < expectedValueSize);
} finally {
while (cluster.getClusterInfo().get_topologies_size() > 0) {
Thread.sleep(10);
}
Utils.resetClassLoaderForJavaDeSerialize();
}
}
代码示例来源:origin: apache/storm
protected int run(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
conf.setDebug(true);
String topologyName = "test";
conf.setNumWorkers(3);
if (args != null && args.length > 0) {
topologyName = args[0];
}
return submit(topologyName, conf, builder);
}
代码示例来源:origin: apache/storm
protected int run(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
conf.setDebug(true);
String topologyName = "word-count";
conf.setNumWorkers(3);
if (args != null && args.length > 0) {
topologyName = args[0];
}
return submit(topologyName, conf, builder);
}
代码示例来源:origin: apache/storm
@Test
public void testSubmitTopologyToLocalNimbus() throws Exception {
int port = Utils.getAvailablePort();
try (ILocalCluster localCluster = new LocalCluster.Builder()
.withNimbusDaemon(true)
.withDaemonConf(Config.NIMBUS_THRIFT_PORT, port)
.build()) {
Config topoConf = new Config();
topoConf.putAll(Utils.readDefaultConfig());
topoConf.setDebug(true);
topoConf.put("storm.cluster.mode", "local"); // default is aways "distributed" but here local cluster is being used.
topoConf.put(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN, InmemoryTopologySubmitterHook.class.getName());
topoConf.put(Config.NIMBUS_THRIFT_PORT, port);
List<TopologyDetails> topologyNames = new ArrayList<>();
for (int i = 0; i < 4; i++) {
final String topologyName = "word-count-" + UUID.randomUUID().toString();
final StormTopology stormTopology = createTestTopology();
topologyNames.add(new TopologyDetails(topologyName, stormTopology));
localCluster.submitTopology(topologyName, topoConf, stormTopology);
}
Assert.assertEquals(InmemoryTopologySubmitterHook.submittedTopologies, topologyNames);
}
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentence(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
String topoName = "word-count";
if (args != null && args.length > 0) {
topoName = args[0];
}
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomIntegerSpout());
builder.setBolt("partialsum", new StatefulSumBolt("partial"), 1).shuffleGrouping("spout");
builder.setBolt("printer", new PrinterBolt(), 2).shuffleGrouping("partialsum");
builder.setBolt("total", new StatefulSumBolt("total"), 1).shuffleGrouping("printer");
Config conf = new Config();
conf.setDebug(false);
String topoName = "test";
if (args != null && args.length > 0) {
topoName = args[0];
}
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomIntegerSpout());
builder.setBolt("sumbolt", new WindowSumBolt().withWindow(new Count(5), new Count(3))
.withMessageIdField("msgid"), 1).shuffleGrouping("spout");
builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("sumbolt");
Config conf = new Config();
conf.setDebug(false);
//conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
String topoName = "test";
if (args != null && args.length > 0) {
topoName = args[0];
}
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
代码示例来源:origin: apache/storm
conf.setDebug(false);
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
BaseWindowedBolt bolt = new SlidingWindowSumBolt()
.withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS))
.withTimestampField("ts")
.withLag(new Duration(5, TimeUnit.SECONDS));
builder.setSpout("integer", new RandomIntegerSpout(), 1);
builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer");
builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum");
Config conf = new Config();
conf.setDebug(true);
String topoName = "test";
if (args != null && args.length > 0) {
topoName = args[0];
}
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("integer", new RandomIntegerSpout(), 1);
builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(Count.of(30), Count.of(10)), 1)
.shuffleGrouping("integer");
builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(Count.of(3)), 1)
.shuffleGrouping("slidingsum");
builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
Config conf = new Config();
conf.setDebug(true);
String topoName = "test";
if (args != null && args.length > 0) {
topoName = args[0];
}
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
内容来源于网络,如有侵权,请联系作者删除!