本文整理了Java中org.apache.flink.configuration.Configuration.<init>()
方法的一些代码示例,展示了Configuration.<init>()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.<init>()
方法的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration
方法名:<init>
[英]Creates a new empty configuration.
[中]创建新的空配置。
代码示例来源:origin: apache/flink
/**
* Specify a custom {@code Configuration} that will be used when creating
* the {@link FileSystem} for writing.
*/
public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
this.fsConfig = new Configuration();
for (Map.Entry<String, String> entry : config) {
fsConfig.setString(entry.getKey(), entry.getValue());
}
return this;
}
代码示例来源:origin: apache/flink
@Test
public void testCopyConstructor() {
try {
final String key = "theKey";
Configuration cfg1 = new Configuration();
cfg1.setString(key, "value");
Configuration cfg2 = new Configuration(cfg1);
cfg2.setString(key, "another value");
assertEquals("value", cfg1.getString(key, ""));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void loadFromClasspathByDefault() {
org.apache.hadoop.conf.Configuration hadoopConf =
HadoopUtils.getHadoopConfiguration(new Configuration());
assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null));
}
代码示例来源:origin: apache/flink
@Test
public void testUnresolvableHostname2() throws Exception {
InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
Collections.<URL>emptyList(), Collections.<URL>emptyList());
try {
exec.executePlan(getProgram());
fail("This should fail with an ProgramInvocationException");
}
catch (UnknownHostException ignored) {
// that is what we want!
}
}
代码示例来源:origin: apache/flink
@Test
public void testBindAddressFirstDeprecatedKey() {
final Configuration configuration = new Configuration();
final String expectedAddress = "foobar";
configuration.setString("web.address", expectedAddress);
final String actualAddress = configuration.getString(RestOptions.BIND_ADDRESS);
assertThat(actualAddress, is(equalTo(expectedAddress)));
}
代码示例来源:origin: apache/flink
@Test
public void testConfigure() throws Exception {
ConfigurableDummyInputFormat inputFormat = mock(ConfigurableDummyInputFormat.class);
HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration());
verify(inputFormat, times(1)).setConf(any(Configuration.class));
}
代码示例来源:origin: apache/flink
@Test
public void testCorrectSettingOfDetachedMode() throws Exception {
String[] params =
new String[] {"-yd"};
FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
new Configuration(),
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
assertTrue(descriptor.isDetachedMode());
}
代码示例来源:origin: apache/flink
private static Configuration getClusterConfiguration() {
Configuration config = new Configuration();
try {
File logDir = File.createTempFile("TestBaseUtils-logdir", null);
assertTrue("Unable to delete temp file", logDir.delete());
assertTrue("Unable to create temp directory", logDir.mkdir());
File logFile = new File(logDir, "jobmanager.log");
File outFile = new File(logDir, "jobmanager.out");
Files.createFile(logFile.toPath());
Files.createFile(outFile.toPath());
config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
} catch (Exception e) {
throw new AssertionError("Could not setup test.", e);
}
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "12m");
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
return config;
}
代码示例来源:origin: apache/flink
@Before
public void setup() {
testingFatalErrorHandler = new TestingFatalErrorHandler();
flinkConfig = new Configuration();
flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
File root = folder.getRoot();
File home = new File(root, "home");
boolean created = home.mkdir();
assertTrue(created);
env = new HashMap<>();
env.put(ENV_APP_ID, "foo");
env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
env.put(ENV_CLIENT_SHIP_FILES, "");
env.put(ENV_FLINK_CLASSPATH, "");
env.put(ENV_HADOOP_USER_NAME, "foo");
env.put(FLINK_JAR_PATH, root.toURI().toString());
}
代码示例来源:origin: apache/flink
@Before
public void setUp() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(), Collections.<URL>emptyList());
program = mock(PackagedProgram.class);
when(program.getPlanWithJars()).thenReturn(jobWithJars);
final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, freePort);
config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
try {
scala.Tuple2<String, Object> address = new scala.Tuple2<String, Object>("localhost", freePort);
jobManagerSystem = AkkaUtils.createActorSystem(config, new scala.Some<scala.Tuple2<String, Object>>(address));
}
catch (Exception e) {
e.printStackTrace();
fail("Setup of test actor system failed.");
}
}
代码示例来源:origin: apache/flink
@Test
public void testGetEnvironmentVariablesErroneous() {
Configuration testConf = new Configuration();
testConf.setString("yarn.application-master.env.", "/usr/lib/native");
Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
Assert.assertEquals(0, res.size());
}
代码示例来源:origin: apache/flink
@Test
public void testMultipleYarnShipOptions() throws Exception {
final String[] args = new String[]{"run", "--yarnship", tmp.newFolder().getAbsolutePath(), "--yarnship", tmp.newFolder().getAbsolutePath()};
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
new Configuration(),
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
AbstractYarnClusterDescriptor flinkYarnDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);
assertEquals(2, flinkYarnDescriptor.shipFiles.size());
}
代码示例来源:origin: apache/flink
@Test
public void testGetStatisticsNonExistingFile() {
try {
final DummyFileInputFormat format = new DummyFileInputFormat();
format.setFilePath("file:///some/none/existing/directory/");
format.configure(new Configuration());
BaseStatistics stats = format.getStatistics(null);
Assert.assertNull("The file statistics should be null.", stats);
} catch (Exception ex) {
ex.printStackTrace();
Assert.fail(ex.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testBindAddressSecondDeprecatedKey() {
final Configuration configuration = new Configuration();
final String expectedAddress = "foobar";
configuration.setString("jobmanager.web.address", expectedAddress);
final String actualAddress = configuration.getString(RestOptions.BIND_ADDRESS);
assertThat(actualAddress, is(equalTo(expectedAddress)));
}
}
代码示例来源:origin: apache/flink
@Test
public void testRepeatedClose() throws Exception {
final Configuration flinkConfig = new Configuration();
final YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
services.closeAndCleanupAllData();
// this should not throw an exception
services.close();
}
代码示例来源:origin: apache/flink
@Test
public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
final Configuration configuration = new Configuration();
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
configuration,
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
final AbstractYarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);
final Configuration clusterDescriptorConfiguration = clusterDescriptor.getFlinkConfiguration();
String zkNs = clusterDescriptorConfiguration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
assertTrue(zkNs.matches("application_\\d+_0042"));
}
代码示例来源:origin: apache/flink
/**
* Specify a custom {@code Configuration} that will be used when creating
* the {@link FileSystem} for writing.
*/
public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
this.fsConfig = new Configuration();
for (Map.Entry<String, String> entry : config) {
fsConfig.setString(entry.getKey(), entry.getValue());
}
return this;
}
代码示例来源:origin: apache/flink
@Test
public void testGetEnvironmentVariables() {
Configuration testConf = new Configuration();
testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", "/usr/lib/native");
Map<String, String> res = Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
Assert.assertEquals(1, res.size());
Map.Entry<String, String> entry = res.entrySet().iterator().next();
Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
Assert.assertEquals("/usr/lib/native", entry.getValue());
}
代码示例来源:origin: apache/flink
@Test
public void testZookeeperNamespaceProperty() throws Exception {
String zkNamespaceCliInput = "flink_test_namespace";
String[] params = new String[] {"-yn", "2", "-yz", zkNamespaceCliInput};
FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
new Configuration(),
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
}
代码示例来源:origin: apache/flink
@Test
public void testSamplingOverlyLongRecord() {
try {
final String tempFile = TestFileUtils.createTempFile(2 * OptimizerOptions.DELIMITED_FORMAT_MAX_SAMPLE_LEN.defaultValue());
final Configuration conf = new Configuration();
final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
format.setFilePath(tempFile);
format.configure(conf);
Assert.assertNull("Expected exception due to overly long record.", format.getStatistics(null));
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
内容来源于网络,如有侵权,请联系作者删除!