本文整理了Java中org.apache.storm.utils.Utils.findAndReadConfigFile()
方法的一些代码示例,展示了Utils.findAndReadConfigFile()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Utils.findAndReadConfigFile()
方法的具体详情如下:
包路径:org.apache.storm.utils.Utils
类名称:Utils
方法名:findAndReadConfigFile
暂无
代码示例来源:origin: apache/storm
public static Map<String, Object> findAndReadConfigFile(String name) {
return findAndReadConfigFile(name, true);
}
代码示例来源:origin: apache/storm
public static Map<String, Object> readDefaultConfig() {
return findAndReadConfigFile("defaults.yaml", true);
}
代码示例来源:origin: apache/storm
public static Map<String, Object> readYamlConfig(String name, boolean mustExist) {
Map<String, Object> conf = Utils.findAndReadConfigFile(name, mustExist);
ConfigValidation.validateFields(conf);
return conf;
}
代码示例来源:origin: apache/storm
public static Map<String, Object> readStormConfig() {
Map<String, Object> ret = readDefaultConfig();
String confFile = System.getProperty("storm.conf.file");
Map<String, Object> storm;
if (confFile == null || confFile.equals("")) {
storm = findAndReadConfigFile("storm.yaml", false);
} else {
storm = findAndReadConfigFile(confFile, true);
}
ret.putAll(storm);
ret.putAll(readCommandLineOpts());
return ret;
}
代码示例来源:origin: apache/storm
if ((now - 5000) > _lastUpdate || _acl == null) {
Map<String, AclFunctionEntry> acl = new HashMap<>();
Map<String, Object> conf = Utils.findAndReadConfigFile(_aclFileName);
if (conf.containsKey(Config.DRPC_AUTHORIZER_ACL)) {
Map<String, Map<String, ?>> confAcl =
代码示例来源:origin: apache/storm
/**
* Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming
*/
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("args: runDurationSec topConfFile");
return;
}
Integer durationSec = Integer.parseInt(args[0]);
String confFile = args[1];
Map<String, Object> topoConf = Utils.findAndReadConfigFile(confFile);
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
topoConf.putAll(Utils.readCommandLineOpts());
// Submit topology to Storm cluster
Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
代码示例来源:origin: apache/storm
Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false);
ret = (Map<String, Number>) fromFile.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
if (ret != null) {
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
int runTime = -1;
Map<String, Object> topoConf = Utils.findAndReadConfigFile(args[1]);
topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 1);
if (args.length > 0) {
long sleepMs = Integer.parseInt(args[0]);
topoConf.put(SLEEP_MS, sleepMs);
}
if (args.length > 1) {
runTime = Integer.parseInt(args[1]);
}
if (args.length > 2) {
System.err.println("args: spoutSleepMs [runDurationSec] ");
return;
}
topoConf.putAll(Utils.readCommandLineOpts());
// Submit topology to storm cluster
Helper.runOnClusterAndPrintMetrics(runTime, "LowThroughputTopo", topoConf, getTopology(topoConf));
}
代码示例来源:origin: apache/storm
/**
* Spout generates random strings and HDFS bolt writes them to a text file
*/
public static void main(String[] args) throws Exception {
String confFile = "conf/HdfsSpoutTopo.yaml";
int runTime = -1; //Run until Ctrl-C
if (args.length > 0) {
runTime = Integer.parseInt(args[0]);
}
if (args.length > 1) {
confFile = args[1];
}
// Submit to Storm cluster
if (args.length > 2) {
System.err.println("args: [runDurationSec] [confFile]");
return;
}
Map<String, Object> topoConf = Utils.findAndReadConfigFile(confFile);
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.putAll(Utils.readCommandLineOpts());
Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
代码示例来源:origin: apache/storm
Map<String, Object> fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
raw = (Map<String, Map<String, Number>>) fromFile.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
if (raw != null) {
代码示例来源:origin: apache/storm
/**
* Start the topology.
*/
public static void main(String[] args) throws Exception {
int durationSec = -1;
Config topoConf = new Config();
if (args.length > 0) {
durationSec = Integer.parseInt(args[0]);
}
if (args.length > 1) {
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
}
if (args.length > 2) {
System.err.println("args: [runDurationSec] [optionalConfFile]");
return;
}
// Submit to Storm cluster
Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
代码示例来源:origin: apache/storm
@Test
public void testReadInResourceAwareSchedulerUserPools() {
Map<String, Object> fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
LOG.info("fromFile: {}", fromFile);
ConfigValidation.validateFields(fromFile);
}
代码示例来源:origin: apache/storm
Map<String, Object> conf = Utils.findAndReadConfigFile(args[0]);
Configuration configuration = new Configuration();
代码示例来源:origin: apache/storm
/**
* Migrates offsets from the Zookeeper store used by the storm-kafka Trident spouts, to the Zookeeper store used by the
* storm-kafka-clients Trident spout.
*/
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Args: confFile");
System.exit(1);
}
Map<String, Object> conf = Utils.findAndReadConfigFile(args[0]);
Configuration configuration = new Configuration();
configuration.zkHosts = MapUtil.getOrError(conf, "zookeeper.servers");
configuration.zkRoot = MapUtil.getOrError(conf, "zookeeper.root");
configuration.txId = MapUtil.getOrError(conf, "txid");
configuration.topic = MapUtil.getOrError(conf, "topic");
configuration.isWildcardTopic = MapUtil.getOrError(conf, "is.wildcard.topic");
configuration.newTopologyTxId = MapUtil.getOrError(conf, "new.topology.txid");
configuration.zkSessionTimeoutMs = MapUtil.getOrError(conf, "zookeeper.session.timeout.ms");
configuration.zkConnectionTimeoutMs = MapUtil.getOrError(conf, "zookeeper.connection.timeout.ms");
configuration.zkRetryTimes = MapUtil.getOrError(conf, "zookeeper.retry.times");
configuration.zkRetryIntervalMs = MapUtil.getOrError(conf, "zookeeper.retry.interval.ms");
try (CuratorFramework curator = newCurator(configuration)) {
curator.start();
Map<TopicPartition, Map<Long, PartitionMetadata>> offsetsToMigrate = getOffsetsToMigrate(curator, configuration);
LOG.info("Migrating offsets {}", offsetsToMigrate);
migrateOffsets(curator, configuration, offsetsToMigrate);
migrateCoordinator(curator, configuration, new ArrayList<>(offsetsToMigrate.keySet()));
}
}
代码示例来源:origin: apache/storm
/**
* ConstSpout only topology (No bolts)
*/
public static void main(String[] args) throws Exception {
int runTime = -1;
Config topoConf = new Config();
if (args.length > 0) {
runTime = Integer.parseInt(args[0]);
}
if (args.length > 1) {
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
}
topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.putAll(Utils.readCommandLineOpts());
if (args.length > 2) {
System.err.println("args: [runDurationSec] [optionalConfFile]");
return;
}
// Submit topology to storm cluster
Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology());
}
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("args: runDurationSec topConfFile");
return;
}
final Integer durationSec = Integer.parseInt(args[0]);
Config topoConf = new Config();
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.putAll(Utils.readCommandLineOpts());
// Submit to Storm cluster
Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
int runTime = -1;
Config topoConf = new Config();
if (args.length > 0) {
runTime = Integer.parseInt(args[0]);
}
if (args.length > 1) {
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
}
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.putAll(Utils.readCommandLineOpts());
if (args.length > 2) {
System.err.println("args: [runDurationSec] [optionalConfFile]");
return;
}
// Submit topology to storm cluster
Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
代码示例来源:origin: apache/storm
public static void main(String[] args) throws Exception {
int runTime = -1;
Config topoConf = new Config();
if (args.length > 2) {
String file = args[0];
runTime = Integer.parseInt(args[1]);
topoConf.put(INPUT_FILE, file);
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
}
if (args.length > 3 || args.length == 0) {
System.err.println("args: file.txt [runDurationSec] [optionalConfFile]");
return;
}
topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
topoConf.putAll(Utils.readCommandLineOpts());
// Submit topology to storm cluster
Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
代码示例来源:origin: apache/metron
private Map readStormConfigWithoutCLI() {
Map ret = Utils.readDefaultConfig();
String confFile = System.getProperty("storm.conf.file");
Map storm;
if (confFile == null || confFile.equals("")) {
storm = Utils.findAndReadConfigFile("storm.yaml", false);
} else {
storm = Utils.findAndReadConfigFile(confFile, true);
}
ret.putAll(storm);
return ret;
}
代码示例来源:origin: DigitalPebble/storm-crawler
public static void start(ConfigurableTopology topology, String args[]) {
// loads the default configuration file
Map defaultSCConfig = Utils.findAndReadConfigFile(
"crawler-default.yaml", false);
topology.conf.putAll(ConfUtils.extractConfigElement(defaultSCConfig));
String[] remainingArgs = topology.parse(args);
topology.run(remainingArgs);
}
内容来源于网络,如有侵权,请联系作者删除!