org.apache.flink.configuration.Configuration.<init>()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(10.6k)|赞(0)|评价(0)|浏览(256)

本文整理了Java中org.apache.flink.configuration.Configuration.<init>()方法的一些代码示例,展示了Configuration.<init>()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.<init>()方法的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration
方法名:<init>

Configuration.<init>介绍

[英]Creates a new empty configuration.
[中]创建新的空配置。

代码示例

代码示例来源:origin: apache/flink

/**
 * Specify a custom {@code Configuration} that will be used when creating
 * the {@link FileSystem} for writing.
 */
public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
  this.fsConfig = new Configuration();
  for (Map.Entry<String, String> entry : config) {
    fsConfig.setString(entry.getKey(), entry.getValue());
  }
  return this;
}

代码示例来源:origin: apache/flink

@Test
public void testCopyConstructor() {
  try {
    final String key = "theKey";
    Configuration cfg1 = new Configuration();
    cfg1.setString(key, "value");
    Configuration cfg2 = new Configuration(cfg1);
    cfg2.setString(key, "another value");
    assertEquals("value", cfg1.getString(key, ""));
  }
  catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

@Test
public void loadFromClasspathByDefault() {
  org.apache.hadoop.conf.Configuration hadoopConf =
      HadoopUtils.getHadoopConfiguration(new Configuration());
  assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
}

代码示例来源:origin: apache/flink

@Test
public void testUnresolvableHostname2() throws Exception {
  InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
  RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
      Collections.<URL>emptyList(), Collections.<URL>emptyList());
  try {
    exec.executePlan(getProgram());
    fail("This should fail with an ProgramInvocationException");
  }
  catch (UnknownHostException ignored) {
    // that is what we want!
  }
}

代码示例来源:origin: apache/flink

@Test
public void testBindAddressFirstDeprecatedKey() {
  final Configuration configuration = new Configuration();
  final String expectedAddress = "foobar";
  configuration.setString("web.address", expectedAddress);
  final String actualAddress = configuration.getString(RestOptions.BIND_ADDRESS);
  assertThat(actualAddress, is(equalTo(expectedAddress)));
}

代码示例来源:origin: apache/flink

@Test
public void testConfigure() throws Exception {
  ConfigurableDummyInputFormat inputFormat = mock(ConfigurableDummyInputFormat.class);
  HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
  hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration());
  verify(inputFormat, times(1)).setConf(any(Configuration.class));
}

代码示例来源:origin: apache/flink

@Test
public void testCorrectSettingOfDetachedMode() throws Exception {
  String[] params =
    new String[] {"-yd"};
  FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
    new Configuration(),
    tmp.getRoot().getAbsolutePath(),
    "y",
    "yarn");
  final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
  AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
  // each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
  assertTrue(descriptor.isDetachedMode());
}

代码示例来源:origin: apache/flink

private static Configuration getClusterConfiguration() {
  Configuration config = new Configuration();
  try {
    File logDir = File.createTempFile("TestBaseUtils-logdir", null);
    assertTrue("Unable to delete temp file", logDir.delete());
    assertTrue("Unable to create temp directory", logDir.mkdir());
    File logFile = new File(logDir, "jobmanager.log");
    File outFile = new File(logDir, "jobmanager.out");
    Files.createFile(logFile.toPath());
    Files.createFile(outFile.toPath());
    config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath());
    config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
  } catch (Exception e) {
    throw new AssertionError("Could not setup test.", e);
  }
  config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "12m");
  config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
  return config;
}

代码示例来源:origin: apache/flink

@Before
public void setup() {
  testingFatalErrorHandler = new TestingFatalErrorHandler();
  flinkConfig = new Configuration();
  flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
  File root = folder.getRoot();
  File home = new File(root, "home");
  boolean created = home.mkdir();
  assertTrue(created);
  env = new HashMap<>();
  env.put(ENV_APP_ID, "foo");
  env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
  env.put(ENV_CLIENT_SHIP_FILES, "");
  env.put(ENV_FLINK_CLASSPATH, "");
  env.put(ENV_HADOOP_USER_NAME, "foo");
  env.put(FLINK_JAR_PATH, root.toURI().toString());
}

代码示例来源:origin: apache/flink

@Before
public void setUp() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
  env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
  Plan plan = env.createProgramPlan();
  JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(),  Collections.<URL>emptyList());
  program = mock(PackagedProgram.class);
  when(program.getPlanWithJars()).thenReturn(jobWithJars);
  final int freePort = NetUtils.getAvailablePort();
  config = new Configuration();
  config.setString(JobManagerOptions.ADDRESS, "localhost");
  config.setInteger(JobManagerOptions.PORT, freePort);
  config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
  try {
    scala.Tuple2<String, Object> address = new scala.Tuple2<String, Object>("localhost", freePort);
    jobManagerSystem = AkkaUtils.createActorSystem(config, new scala.Some<scala.Tuple2<String, Object>>(address));
  }
  catch (Exception e) {
    e.printStackTrace();
    fail("Setup of test actor system failed.");
  }
}

代码示例来源:origin: apache/flink

@Test
public void testGetEnvironmentVariablesErroneous() {
  Configuration testConf = new Configuration();
  testConf.setString("yarn.application-master.env.", "/usr/lib/native");
  Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
  Assert.assertEquals(0, res.size());
}

代码示例来源:origin: apache/flink

@Test
public void testMultipleYarnShipOptions() throws Exception {
  final String[] args = new String[]{"run", "--yarnship", tmp.newFolder().getAbsolutePath(), "--yarnship", tmp.newFolder().getAbsolutePath()};
  final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
    new Configuration(),
    tmp.getRoot().getAbsolutePath(),
    "y",
    "yarn");
  final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
  AbstractYarnClusterDescriptor flinkYarnDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);
  assertEquals(2, flinkYarnDescriptor.shipFiles.size());
}

代码示例来源:origin: apache/flink

@Test
public void testGetStatisticsNonExistingFile() {
  try {
    final DummyFileInputFormat format = new DummyFileInputFormat();
    format.setFilePath("file:///some/none/existing/directory/");
    format.configure(new Configuration());
    
    BaseStatistics stats = format.getStatistics(null);
    Assert.assertNull("The file statistics should be null.", stats);
  } catch (Exception ex) {
    ex.printStackTrace();
    Assert.fail(ex.getMessage());
  }
}

代码示例来源:origin: apache/flink

@Test
  public void testBindAddressSecondDeprecatedKey() {
    final Configuration configuration = new Configuration();
    final String expectedAddress = "foobar";
    configuration.setString("jobmanager.web.address", expectedAddress);

    final String actualAddress = configuration.getString(RestOptions.BIND_ADDRESS);

    assertThat(actualAddress, is(equalTo(expectedAddress)));
  }
}

代码示例来源:origin: apache/flink

@Test
public void testRepeatedClose() throws Exception {
  final Configuration flinkConfig = new Configuration();
  final YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
  services.closeAndCleanupAllData();
  // this should not throw an exception
  services.close();
}

代码示例来源:origin: apache/flink

@Test
public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
  final Configuration configuration = new Configuration();
  final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
    configuration,
    tmp.getRoot().getAbsolutePath(),
    "y",
    "yarn");
  final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
  final AbstractYarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);
  final Configuration clusterDescriptorConfiguration = clusterDescriptor.getFlinkConfiguration();
  String zkNs = clusterDescriptorConfiguration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
  assertTrue(zkNs.matches("application_\\d+_0042"));
}

代码示例来源:origin: apache/flink

/**
 * Specify a custom {@code Configuration} that will be used when creating
 * the {@link FileSystem} for writing.
 */
public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
  this.fsConfig = new Configuration();
  for (Map.Entry<String, String> entry : config) {
    fsConfig.setString(entry.getKey(), entry.getValue());
  }
  return this;
}

代码示例来源:origin: apache/flink

@Test
public void testGetEnvironmentVariables() {
  Configuration testConf = new Configuration();
  testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native");
  Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
  Assert.assertEquals(1, res.size());
  Map.Entry<String, String> entry = res.entrySet().iterator().next();
  Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
  Assert.assertEquals("/usr/lib/native", entry.getValue());
}

代码示例来源:origin: apache/flink

@Test
public void testZookeeperNamespaceProperty() throws Exception {
  String zkNamespaceCliInput = "flink_test_namespace";
  String[] params = new String[] {"-yn", "2", "-yz", zkNamespaceCliInput};
  FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
    new Configuration(),
    tmp.getRoot().getAbsolutePath(),
    "y",
    "yarn");
  CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
  AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
  assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
}

代码示例来源:origin: apache/flink

@Test
public void testSamplingOverlyLongRecord() {
  try {
    final String tempFile = TestFileUtils.createTempFile(2 * OptimizerOptions.DELIMITED_FORMAT_MAX_SAMPLE_LEN.defaultValue());
    final Configuration conf = new Configuration();
    
    final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
    format.setFilePath(tempFile);
    format.configure(conf);
    
    Assert.assertNull("Expected exception due to overly long record.", format.getStatistics(null));
  } catch (Exception e) {
    e.printStackTrace();
    Assert.fail(e.getMessage());
  }
}

相关文章