本文整理了Java中org.apache.flink.configuration.Configuration.setString()
方法的一些代码示例,展示了Configuration.setString()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.setString()
方法的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration
方法名:setString
[英]Adds the given key/value pair to the configuration object.
[中]将给定的键/值对添加到配置对象。
代码示例来源:origin: apache/flink
public static org.apache.flink.configuration.Configuration populateFlinkSecureConfigurations(
@Nullable org.apache.flink.configuration.Configuration flinkConf) {
org.apache.flink.configuration.Configuration conf;
if (flinkConf == null) {
conf = new org.apache.flink.configuration.Configuration();
} else {
conf = flinkConf;
}
conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB , testKeytab);
conf.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL , testPrincipal);
return conf;
}
代码示例来源:origin: apache/flink
@Test
public void testBindAddressFirstDeprecatedKey() {
final Configuration configuration = new Configuration();
final String expectedAddress = "foobar";
configuration.setString("web.address", expectedAddress);
final String actualAddress = configuration.getString(RestOptions.BIND_ADDRESS);
assertThat(actualAddress, is(equalTo(expectedAddress)));
}
代码示例来源:origin: apache/flink
/**
* Tests that the configuration is properly passed via the DefaultCLI to the
* created ClusterDescriptor.
*/
@Test
public void testConfigurationPassing() throws Exception {
final Configuration configuration = getConfiguration();
final String localhost = "localhost";
final int port = 1234;
configuration.setString(JobManagerOptions.ADDRESS, localhost);
configuration.setInteger(JobManagerOptions.PORT, port);
@SuppressWarnings("unchecked")
final AbstractCustomCommandLine<StandaloneClusterId> defaultCLI =
(AbstractCustomCommandLine<StandaloneClusterId>) getCli(configuration);
final String[] args = {};
CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
final ClusterDescriptor<StandaloneClusterId> clusterDescriptor =
defaultCLI.createClusterDescriptor(commandLine);
final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
final LeaderConnectionInfo clusterConnectionInfo = clusterClient.getClusterConnectionInfo();
assertThat(clusterConnectionInfo.getHostname(), Matchers.equalTo(localhost));
assertThat(clusterConnectionInfo.getPort(), Matchers.equalTo(port));
}
代码示例来源:origin: apache/flink
@Test
public void testBindAddressSecondDeprecatedKey() {
final Configuration configuration = new Configuration();
final String expectedAddress = "foobar";
configuration.setString("jobmanager.web.address", expectedAddress);
final String actualAddress = configuration.getString(RestOptions.BIND_ADDRESS);
assertThat(actualAddress, is(equalTo(expectedAddress)));
}
}
代码示例来源:origin: apache/flink
/**
* Tests that command line options override the configuration settings.
*/
@Test
public void testManualConfigurationOverride() throws Exception {
final String localhost = "localhost";
final int port = 1234;
final Configuration configuration = getConfiguration();
configuration.setString(JobManagerOptions.ADDRESS, localhost);
configuration.setInteger(JobManagerOptions.PORT, port);
@SuppressWarnings("unchecked")
final AbstractCustomCommandLine<StandaloneClusterId> defaultCLI =
(AbstractCustomCommandLine<StandaloneClusterId>) getCli(configuration);
final String manualHostname = "123.123.123.123";
final int manualPort = 4321;
final String[] args = {"-m", manualHostname + ':' + manualPort};
CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
final ClusterDescriptor<StandaloneClusterId> clusterDescriptor =
defaultCLI.createClusterDescriptor(commandLine);
final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
final LeaderConnectionInfo clusterConnectionInfo = clusterClient.getClusterConnectionInfo();
assertThat(clusterConnectionInfo.getHostname(), Matchers.equalTo(manualHostname));
assertThat(clusterConnectionInfo.getPort(), Matchers.equalTo(manualPort));
}
代码示例来源:origin: apache/flink
/**
* Specify a custom {@code Configuration} that will be used when creating
* the {@link FileSystem} for writing.
*/
public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
this.fsConfig = new Configuration();
for (Map.Entry<String, String> entry : config) {
fsConfig.setString(entry.getKey(), entry.getValue());
}
return this;
}
代码示例来源:origin: apache/flink
@Test
public void testGetEnvironmentVariablesErroneous() {
Configuration testConf = new Configuration();
testConf.setString("yarn.application-master.env.", "/usr/lib/native");
Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
Assert.assertEquals(0, res.size());
}
代码示例来源:origin: apache/flink
@Test
public void testTaskManagerRegistrationAtReelectedLeader() throws Exception {
File rootFolder = tempFolder.getRoot();
zkServer.getConnectString(),
rootFolder.getPath());
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
代码示例来源:origin: apache/flink
/**
* Specify a custom {@code Configuration} that will be used when creating
* the {@link FileSystem} for writing.
*/
public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
this.fsConfig = new Configuration();
for (Map.Entry<String, String> entry : config) {
fsConfig.setString(entry.getKey(), entry.getValue());
}
return this;
}
代码示例来源:origin: apache/flink
@Test
public void testCopyConstructor() {
try {
final String key = "theKey";
Configuration cfg1 = new Configuration();
cfg1.setString(key, "value");
Configuration cfg2 = new Configuration(cfg1);
cfg2.setString(key, "another value");
assertEquals("value", cfg1.getString(key, ""));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testJobExecutionOnClusterWithLeaderReelection() throws Exception {
int numJMs = 10;
zkServer.getConnectString(),
rootFolder.getPath());
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
configuration.setString(AkkaOptions.ASK_TIMEOUT, AkkaUtils.INF_TIMEOUT().toString());
代码示例来源:origin: apache/flink
/**
* Returns a {@link Configuration} object from this {@link ParameterTool}.
*
* @return A {@link Configuration}
*/
public Configuration getConfiguration() {
final Configuration conf = new Configuration();
for (Map.Entry<String, String> entry : data.entrySet()) {
conf.setString(entry.getKey(), entry.getValue());
}
return conf;
}
代码示例来源:origin: apache/flink
@Test
public void testUnresolvableHostname1() throws UnknownHostException, ConfigurationException {
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
config.setInteger(JobManagerOptions.PORT, 17234);
StandaloneUtils.createLeaderRetrievalService(
config,
false,
JobMaster.JOB_MANAGER_NAME);
}
代码示例来源:origin: apache/flink
@Test
public void testSavepointForJobWithIteration() throws Exception {
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
代码示例来源:origin: apache/flink
/**
* Creates a new {@link Configuration} from the given {@link Properties}.
*
* @param properties to convert into a {@link Configuration}
* @return {@link Configuration} which has been populated by the values of the given {@link Properties}
*/
@Nonnull
public static Configuration createConfiguration(Properties properties) {
final Configuration configuration = new Configuration();
final Set<String> propertyNames = properties.stringPropertyNames();
for (String propertyName : propertyNames) {
configuration.setString(propertyName, properties.getProperty(propertyName));
}
return configuration;
}
代码示例来源:origin: apache/flink
@Test (expected = RuntimeException.class)
public void testSetFileViaConfigurationEmptyPath() {
final DummyFileInputFormat format = new DummyFileInputFormat();
final String filePath = null;
Configuration conf = new Configuration();
conf.setString("input.file.path", filePath);
format.configure(conf);
}
代码示例来源:origin: apache/flink
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
return config;
}
代码示例来源:origin: apache/flink
@Test
public void testGetEnvironmentVariables() {
Configuration testConf = new Configuration();
testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native");
Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
Assert.assertEquals(1, res.size());
Map.Entry<String, String> entry = res.entrySet().iterator().next();
Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
Assert.assertEquals("/usr/lib/native", entry.getValue());
}
代码示例来源:origin: apache/flink
protected static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
return flinkConfig;
}
代码示例来源:origin: apache/flink
@Test(expected = IllegalArgumentException.class)
public void illegalArgument() {
Configuration conf = new Configuration();
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1.1");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
内容来源于网络,如有侵权,请联系作者删除!