org.apache.flink.configuration.Configuration.setString()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(9.6k)|赞(0)|评价(0)|浏览(277)

本文整理了Java中org.apache.flink.configuration.Configuration.setString()方法的一些代码示例,展示了Configuration.setString()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.setString()方法的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration
方法名:setString

Configuration.setString介绍

[英]Adds the given key/value pair to the configuration object.
[中]将给定的键/值对添加到配置对象。

代码示例

代码示例来源:origin: apache/flink

public static org.apache.flink.configuration.Configuration populateFlinkSecureConfigurations(
    @Nullable org.apache.flink.configuration.Configuration flinkConf) {
  org.apache.flink.configuration.Configuration conf;
  if (flinkConf == null) {
    conf = new org.apache.flink.configuration.Configuration();
  } else {
    conf = flinkConf;
  }
  conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB , testKeytab);
  conf.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL , testPrincipal);
  return conf;
}

代码示例来源:origin: apache/flink

@Test
public void testBindAddressFirstDeprecatedKey() {
  final Configuration configuration = new Configuration();
  final String expectedAddress = "foobar";
  configuration.setString("web.address", expectedAddress);
  final String actualAddress = configuration.getString(RestOptions.BIND_ADDRESS);
  assertThat(actualAddress, is(equalTo(expectedAddress)));
}

代码示例来源:origin: apache/flink

/**
 * Tests that the configuration is properly passed via the DefaultCLI to the
 * created ClusterDescriptor.
 */
@Test
public void testConfigurationPassing() throws Exception {
  final Configuration configuration = getConfiguration();
  final String localhost = "localhost";
  final int port = 1234;
  configuration.setString(JobManagerOptions.ADDRESS, localhost);
  configuration.setInteger(JobManagerOptions.PORT, port);
  @SuppressWarnings("unchecked")
  final AbstractCustomCommandLine<StandaloneClusterId> defaultCLI =
    (AbstractCustomCommandLine<StandaloneClusterId>) getCli(configuration);
  final String[] args = {};
  CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
  final ClusterDescriptor<StandaloneClusterId> clusterDescriptor =
    defaultCLI.createClusterDescriptor(commandLine);
  final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
  final LeaderConnectionInfo clusterConnectionInfo = clusterClient.getClusterConnectionInfo();
  assertThat(clusterConnectionInfo.getHostname(), Matchers.equalTo(localhost));
  assertThat(clusterConnectionInfo.getPort(), Matchers.equalTo(port));
}

代码示例来源:origin: apache/flink

@Test
  public void testBindAddressSecondDeprecatedKey() {
    final Configuration configuration = new Configuration();
    final String expectedAddress = "foobar";
    configuration.setString("jobmanager.web.address", expectedAddress);

    final String actualAddress = configuration.getString(RestOptions.BIND_ADDRESS);

    assertThat(actualAddress, is(equalTo(expectedAddress)));
  }
}

代码示例来源:origin: apache/flink

/**
 * Tests that command line options override the configuration settings.
 */
@Test
public void testManualConfigurationOverride() throws Exception {
  final String localhost = "localhost";
  final int port = 1234;
  final Configuration configuration = getConfiguration();
  configuration.setString(JobManagerOptions.ADDRESS, localhost);
  configuration.setInteger(JobManagerOptions.PORT, port);
  @SuppressWarnings("unchecked")
  final AbstractCustomCommandLine<StandaloneClusterId> defaultCLI =
    (AbstractCustomCommandLine<StandaloneClusterId>) getCli(configuration);
  final String manualHostname = "123.123.123.123";
  final int manualPort = 4321;
  final String[] args = {"-m", manualHostname + ':' + manualPort};
  CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
  final ClusterDescriptor<StandaloneClusterId> clusterDescriptor =
    defaultCLI.createClusterDescriptor(commandLine);
  final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
  final LeaderConnectionInfo clusterConnectionInfo = clusterClient.getClusterConnectionInfo();
  assertThat(clusterConnectionInfo.getHostname(), Matchers.equalTo(manualHostname));
  assertThat(clusterConnectionInfo.getPort(), Matchers.equalTo(manualPort));
}

代码示例来源:origin: apache/flink

/**
 * Specify a custom {@code Configuration} that will be used when creating
 * the {@link FileSystem} for writing.
 */
public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
  this.fsConfig = new Configuration();
  for (Map.Entry<String, String> entry : config) {
    fsConfig.setString(entry.getKey(), entry.getValue());
  }
  return this;
}

代码示例来源:origin: apache/flink

@Test
public void testGetEnvironmentVariablesErroneous() {
  Configuration testConf = new Configuration();
  testConf.setString("yarn.application-master.env.", "/usr/lib/native");
  Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
  Assert.assertEquals(0, res.size());
}

代码示例来源:origin: apache/flink

@Test
public void testTaskManagerRegistrationAtReelectedLeader() throws Exception {
  File rootFolder = tempFolder.getRoot();
    zkServer.getConnectString(),
    rootFolder.getPath());
  configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());

代码示例来源:origin: apache/flink

/**
 * Specify a custom {@code Configuration} that will be used when creating
 * the {@link FileSystem} for writing.
 */
public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
  this.fsConfig = new Configuration();
  for (Map.Entry<String, String> entry : config) {
    fsConfig.setString(entry.getKey(), entry.getValue());
  }
  return this;
}

代码示例来源:origin: apache/flink

@Test
public void testCopyConstructor() {
  try {
    final String key = "theKey";
    Configuration cfg1 = new Configuration();
    cfg1.setString(key, "value");
    Configuration cfg2 = new Configuration(cfg1);
    cfg2.setString(key, "another value");
    assertEquals("value", cfg1.getString(key, ""));
  }
  catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

@Test
public void testJobExecutionOnClusterWithLeaderReelection() throws Exception {
  int numJMs = 10;
    zkServer.getConnectString(),
    rootFolder.getPath());
  configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
  configuration.setString(AkkaOptions.ASK_TIMEOUT, AkkaUtils.INF_TIMEOUT().toString());

代码示例来源:origin: apache/flink

/**
 * Returns a {@link Configuration} object from this {@link ParameterTool}.
 *
 * @return A {@link Configuration}
 */
public Configuration getConfiguration() {
  final Configuration conf = new Configuration();
  for (Map.Entry<String, String> entry : data.entrySet()) {
    conf.setString(entry.getKey(), entry.getValue());
  }
  return conf;
}

代码示例来源:origin: apache/flink

@Test
public void testUnresolvableHostname1() throws UnknownHostException, ConfigurationException {
  Configuration config = new Configuration();
  config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
  config.setInteger(JobManagerOptions.PORT, 17234);
  StandaloneUtils.createLeaderRetrievalService(
    config,
    false,
    JobMaster.JOB_MANAGER_NAME);
}

代码示例来源:origin: apache/flink

@Test
public void testSavepointForJobWithIteration() throws Exception {
  config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

代码示例来源:origin: apache/flink

/**
 * Creates a new {@link Configuration} from the given {@link Properties}.
 *
 * @param properties to convert into a {@link Configuration}
 * @return {@link Configuration} which has been populated by the values of the given {@link Properties}
 */
@Nonnull
public static Configuration createConfiguration(Properties properties) {
  final Configuration configuration = new Configuration();
  final Set<String> propertyNames = properties.stringPropertyNames();
  for (String propertyName : propertyNames) {
    configuration.setString(propertyName, properties.getProperty(propertyName));
  }
  return configuration;
}

代码示例来源:origin: apache/flink

@Test (expected = RuntimeException.class)
public void testSetFileViaConfigurationEmptyPath() {
  final DummyFileInputFormat format = new DummyFileInputFormat();
  final String filePath = null;
  Configuration conf = new Configuration();
  conf.setString("input.file.path", filePath);
  format.configure(conf);
}

代码示例来源:origin: apache/flink

private static Configuration getConfiguration() {
  Configuration config = new Configuration();
  config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
  config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
  config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
  return config;
}

代码示例来源:origin: apache/flink

@Test
public void testGetEnvironmentVariables() {
  Configuration testConf = new Configuration();
  testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native");
  Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
  Assert.assertEquals(1, res.size());
  Map.Entry<String, String> entry = res.entrySet().iterator().next();
  Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
  Assert.assertEquals("/usr/lib/native", entry.getValue());
}

代码示例来源:origin: apache/flink

protected static Configuration getFlinkConfiguration() {
  Configuration flinkConfig = new Configuration();
  flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
  flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
  flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
  flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
  flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
  return flinkConfig;
}

代码示例来源:origin: apache/flink

@Test(expected = IllegalArgumentException.class)
public void illegalArgument() {
  Configuration conf = new Configuration();
  conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1.1");
  Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}

相关文章