本文整理了Java中org.apache.storm.Config
类的一些代码示例,展示了Config
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Config
类的具体详情如下:
包路径:org.apache.storm.Config
类名称:Config
[英]Topology configs are specified as a plain old map. This class provides a convenient way to create a topology config map by providing setter methods for all the configs that can be set. It also makes it easier to do things like add serializations. This class also provides constants for all the configurations possible on a Storm cluster and Storm topology. Each constant is paired with an annotation that defines the validity criterion of the corresponding field. Default values for these configs can be found in defaults.yaml. Note that you may put other configurations in any of the configs. Storm will ignore anything it doesn't recognize, but your topologies are free to make use of them by reading them in the prepare method of Bolts or the open method of Spouts.
[中]拓扑配置被指定为普通的旧映射。此类通过为所有可以设置的配置提供setter方法,提供了创建拓扑配置映射的方便方法。它还使添加序列化等操作变得更容易。此类还为Storm群集和Storm拓扑上所有可能的配置提供常量。每个常量与定义相应字段有效性标准的注释配对。这些配置的默认值可以在默认值中找到。亚马尔。请注意,您可以在任何配置中放置其他配置。Storm将忽略任何它无法识别的内容,但您的拓扑可以通过在螺栓准备方法或喷口打开方法中读取它们来自由使用它们。
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
StormTopology topology = buildVehiclesTopology();
Config conf = new Config();
conf.setMaxSpoutPending(20);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar("vehicles-topology", conf, topology);
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("args: runDurationSec topConfFile");
return;
}
final Integer durationSec = Integer.parseInt(args[0]);
Config topoConf = new Config();
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.putAll(Utils.readCommandLineOpts());
// Submit to Storm cluster
Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
代码示例来源:origin: apache/storm
private Config newConfig() {
Config config = new Config();
config.setDebug(true);
return config;
}
代码示例来源:origin: apache/storm
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
代码示例来源:origin: apache/storm
/**
* {@inheritDoc}
*/
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
// add tick tuple each second to force acknowledgement of pending tuples.
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
return conf;
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentence(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
String topoName = "word-count";
if (args != null && args.length > 0) {
topoName = args[0];
}
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new InOrderSpout(), 8);
builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new Fields("c1"));
Config conf = new Config();
conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
String name = "in-order-test";
if (args != null && args.length > 0) {
name = args[0];
}
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
Map<String, Object> clusterConf = Utils.readStormConfig();
clusterConf.putAll(Utils.readCommandLineOpts());
Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
//Sleep for 50 mins
for (int i = 0; i < 50; i++) {
Thread.sleep(30 * 1000);
printMetrics(client, name);
}
kill(client, name);
}
代码示例来源:origin: apache/storm
protected int run(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
conf.setDebug(true);
String topologyName = "word-count";
conf.setNumWorkers(3);
if (args != null && args.length > 0) {
topologyName = args[0];
}
return submit(topologyName, conf, builder);
}
代码示例来源:origin: apache/storm
Config conf = new Config();
conf.setNumWorkers(1);
conf.setNumAckers(1);
conf.setMaxTaskParallelism(1);
conf.setDebug(true);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, spout, spoutNum);
builder.setBolt(BOLT_ID, bolt, 1).shuffleGrouping(SPOUT_ID);
Map<String, Object> clusterConf = Utils.readStormConfig();
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
代码示例来源:origin: apache/storm
protected int run(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 4);
builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setMaxTaskParallelism(3);
String topologyName = "word-count";
conf.setNumWorkers(3);
if (args != null && args.length > 0) {
topologyName = args[0];
}
return submit(topologyName, conf, builder);
}
代码示例来源:origin: apache/storm
private GeneralTopologyContext getContext(final Fields fields) {
TopologyBuilder builder = new TopologyBuilder();
return new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return fields;
}
};
}
代码示例来源:origin: apache/storm
@Test
public void testDisableTupleTimeout() throws Exception {
Config daemonConf = new Config();
daemonConf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
try (LocalCluster cluster = new LocalCluster.Builder()
.withDaemonConf(daemonConf)
.withSimulatedTime()
.build()) {
FeederSpout feeder = new FeederSpout(new Fields("field1"));
AckFailMapTracker tracker = new AckFailMapTracker();
feeder.setAckFailDelegate(tracker);
boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new AckEveryOtherBolt()));
Config stormConf = new Config();
stormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);
stormConf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
代码示例来源:origin: apache/storm
/**
* Initialize a fake config.
* @return conf
*/
private static Config initializedConfig() {
Config conf = new Config();
conf.putAll(Utils.readDefaultConfig());
ArrayList<String> nimbusSeeds = new ArrayList<>();
nimbusSeeds.add(NIMBUS_HOST);
conf.put(Config.NIMBUS_SEEDS, nimbusSeeds);
conf.put(Config.NIMBUS_THRIFT_PORT, NIMBUS_PORT);
return conf;
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 100);
// window-state table should already be created with cf:tuples column
HBaseWindowsStoreFactory windowStoreFactory =
new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
String topoName = "wordCounterWithWindowing";
if (args.length > 0) {
topoName = args[0];
}
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology(windowStoreFactory));
}
代码示例来源:origin: apache/storm
/**
* The example's main method.
* @param args the command line arguments
* @throws AlreadyAliveException if the topology is already started
* @throws InvalidTopologyException if the topology is invalid
* @throws AuthorizationException if the topology authorization fails
*/
public static void main(final String[] args) throws AlreadyAliveException,
InvalidTopologyException,
AuthorizationException {
int batchSize = BATCH_SIZE_DEFAULT;
FixedBatchSpout spout = new FixedBatchSpout(batchSize);
spout.cycle = true;
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout", spout);
EsConfig esConfig = new EsConfig("http://localhost:9300");
Fields esFields = new Fields("index", "type", "source");
EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
TridentState state = stream.partitionPersist(factory,
esFields,
new EsUpdater(),
new Fields());
EsTestUtil.startEsNode();
EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT_SECS);
StormSubmitter.submitTopology(TOPOLOGY_NAME,
new Config(),
topology.build());
}
代码示例来源:origin: apache/storm
@Test
public void testSubmitTopologyToLocalNimbus() throws Exception {
int port = Utils.getAvailablePort();
try (ILocalCluster localCluster = new LocalCluster.Builder()
.withNimbusDaemon(true)
.withDaemonConf(Config.NIMBUS_THRIFT_PORT, port)
.build()) {
Config topoConf = new Config();
topoConf.putAll(Utils.readDefaultConfig());
topoConf.setDebug(true);
topoConf.put("storm.cluster.mode", "local"); // default is aways "distributed" but here local cluster is being used.
topoConf.put(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN, InmemoryTopologySubmitterHook.class.getName());
topoConf.put(Config.NIMBUS_THRIFT_PORT, port);
List<TopologyDetails> topologyNames = new ArrayList<>();
for (int i = 0; i < 4; i++) {
final String topologyName = "word-count-" + UUID.randomUUID().toString();
final StormTopology stormTopology = createTestTopology();
topologyNames.add(new TopologyDetails(topologyName, stormTopology));
localCluster.submitTopology(topologyName, topoConf, stormTopology);
}
Assert.assertEquals(InmemoryTopologySubmitterHook.submittedTopologies, topologyNames);
}
}
代码示例来源:origin: apache/storm
public static TopologyDetails topoToTopologyDetails(String name, Map<String, Object> config, StormTopology topology,
int launchTime, int priority, String user, double maxHeapSize) {
Config conf = new Config();
conf.putAll(config);
conf.put(Config.TOPOLOGY_PRIORITY, priority);
conf.put(Config.TOPOLOGY_NAME, name);
conf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, maxHeapSize);
TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
0, genExecsAndComps(topology), launchTime, user);
return topo;
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
StormTopology topology = getStormTopology();
Config conf = new Config();
conf.setDebug(true);
String topoName = "test";
if (args.length > 0) {
topoName = args[0];
}
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, topology);
}
代码示例来源:origin: apache/storm
public static void prepare() {
Config conf = new Config();
conf.putAll(Utils.readStormConfig());
store = Utils.getClientBlobStore(conf);
}
代码示例来源:origin: apache/storm
/**
* Given a topology definition, return a populated `org.apache.storm.Config` instance.
* @param topologyDef topology definition
* @return a Storm Config object
*/
public static Config buildConfig(TopologyDef topologyDef) {
// merge contents of `config` into topology config
Config conf = new Config();
conf.putAll(topologyDef.getConfig());
return conf;
}
内容来源于网络,如有侵权,请联系作者删除!