本文整理了Java中org.apache.flink.configuration.Configuration
类的一些代码示例,展示了Configuration
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration
类的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration
[英]Lightweight configuration object which stores key/value pairs.
[中]存储键/值对的轻量级配置对象。
代码示例来源:origin: apache/flink
public PythonPlanBinder(Configuration globalConfig) {
String configuredPlanTmpPath = globalConfig.getString(PythonOptions.PLAN_TMP_DIR);
tmpPlanFilesDir = configuredPlanTmpPath != null
? configuredPlanTmpPath
: System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID();
operatorConfig = new Configuration();
operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR);
if (configuredTmpDataDir != null) {
operatorConfig.setString(PythonOptions.DATA_TMP_DIR, configuredTmpDataDir);
}
operatorConfig.setLong(PythonOptions.MMAP_FILE_SIZE, globalConfig.getLong(PythonOptions.MMAP_FILE_SIZE));
}
代码示例来源:origin: apache/flink
public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration,
List<URL> jarFiles, List<URL> globalClasspaths) {
this.clientConfiguration = clientConfiguration;
this.jarFiles = jarFiles;
this.globalClasspaths = globalClasspaths;
clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
clientConfiguration.setInteger(RestOptions.PORT, inet.getPort());
}
代码示例来源:origin: apache/flink
private void createBroadcastVariable(PythonOperationInfo info) {
UdfOperator<?> op1 = (UdfOperator<?>) sets.getDataSet(info.parentID);
DataSet<?> op2 = sets.getDataSet(info.otherID);
op1.withBroadcastSet(op2, info.name);
Configuration c = op1.getParameters();
if (c == null) {
c = new Configuration();
}
int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1);
c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, info.name);
op1.withParameters(c);
}
代码示例来源:origin: apache/flink
@Override
public Configuration clone() {
Configuration config = new Configuration();
config.addAll(this);
return config;
}
代码示例来源:origin: apache/flink
@Override
public String getClusterDescription() {
String host = config.getString(JobManagerOptions.ADDRESS, "");
int port = config.getInteger(JobManagerOptions.PORT, -1);
return "Standalone cluster at " + host + ":" + port;
}
代码示例来源:origin: apache/flink
public JobGraphGenerator(Configuration config) {
this.defaultMaxFan = config.getInteger(AlgorithmOptions.SPILLING_MAX_FAN);
this.defaultSortSpillingThreshold = config.getFloat(AlgorithmOptions.SORT_SPILLING_THRESHOLD);
this.useLargeRecordHandler = config.getBoolean(
ConfigConstants.USE_LARGE_RECORD_HANDLER_KEY,
ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER);
}
代码示例来源:origin: apache/flink
@Test
public void testExplicitlySetToLocal() throws Exception {
final Configuration conf = new Configuration();
conf.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, LocalFileSystem.getLocalFsURI().toString());
FileSystem.initialize(conf);
URI justPath = new URI(tempFolder.newFile().toURI().getPath());
assertNull(justPath.getScheme());
FileSystem fs = FileSystem.get(justPath);
assertEquals("file", fs.getUri().getScheme());
}
代码示例来源:origin: apache/flink
@Test
public void testPojoTypeWithPartialFieldInCSV() throws Exception {
File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
wrt.write("123,NODATA,AAA,NODATA,3.123,BBB\n");
wrt.write("456,NODATA,BBB,NODATA,1.123,AAA\n");
wrt.close();
@SuppressWarnings("unchecked")
PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new PojoCsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo, new boolean[]{true, false, true, false, true, true});
inputFormat.configure(new Configuration());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.open(splits[0]);
validatePojoItem(inputFormat);
}
代码示例来源:origin: apache/flink
public PojoSerializerUpgradeTest(String backendType) throws IOException, DynamicCodeLoadingException {
Configuration config = new Configuration();
config.setString(CheckpointingOptions.STATE_BACKEND, backendType);
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toURI().toString());
stateBackend = StateBackendLoader.loadStateBackendFromConfig(config, Thread.currentThread().getContextClassLoader(), null);
}
代码示例来源:origin: apache/flink
/**
* Test with one nested directory and recursive.file.enumeration = true
*/
@Test
public void testOneNestedDirectoryTrue() {
try {
String firstLevelDir = TestFileUtils.randomFileName();
String secondLevelDir = TestFileUtils.randomFileName();
File insideNestedDir = tempFolder.newFolder(firstLevelDir, secondLevelDir);
File nestedDir = insideNestedDir.getParentFile();
// create a file in the first-level and two files in the nested dir
TestFileUtils.createTempFileInDirectory(nestedDir.getAbsolutePath(), "paella");
TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "kalamari");
TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "fideua");
this.format.setFilePath(new Path(nestedDir.toURI().toString()));
this.config.setBoolean("recursive.file.enumeration", true);
format.configure(this.config);
FileInputSplit[] splits = format.createInputSplits(1);
Assert.assertEquals(3, splits.length);
} catch (Exception ex) {
ex.printStackTrace();
Assert.fail(ex.getMessage());
}
}
代码示例来源:origin: apache/flink
protected Configuration createClusterConfig() throws IOException {
TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final File haDir = temporaryFolder.newFolder();
Configuration config = new Configuration();
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
if (zkServer != null) {
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
}
return config;
}
代码示例来源:origin: apache/flink
@Before
public void setup() {
testingFatalErrorHandler = new TestingFatalErrorHandler();
flinkConfig = new Configuration();
flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
File root = folder.getRoot();
File home = new File(root, "home");
boolean created = home.mkdir();
assertTrue(created);
env = new HashMap<>();
env.put(ENV_APP_ID, "foo");
env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
env.put(ENV_CLIENT_SHIP_FILES, "");
env.put(ENV_FLINK_CLASSPATH, "");
env.put(ENV_HADOOP_USER_NAME, "foo");
env.put(FLINK_JAR_PATH, root.toURI().toString());
}
代码示例来源:origin: apache/flink
/**
* Ensure that the program parallelism can be set even if the configuration is supplied.
*/
@Test
public void testUserSpecificParallelism() throws Exception {
Configuration config = new Configuration();
config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
final URI restAddress = MINI_CLUSTER_RESOURCE.getMiniCluster().getRestAddress();
final String hostname = restAddress.getHost();
final int port = restAddress.getPort();
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
hostname,
port,
config
);
env.setParallelism(USER_DOP);
env.getConfig().disableSysoutLogging();
DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
.rebalance()
.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
@Override
public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
out.collect(getRuntimeContext().getIndexOfThisSubtask());
}
});
List<Integer> resultCollection = result.collect();
assertEquals(USER_DOP, resultCollection.size());
}
代码示例来源:origin: apache/flink
@Test
public void testExplicitlyPathTakesPrecedence() throws Exception {
final Configuration conf = new Configuration();
conf.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "otherFS://localhost:1234/");
FileSystem.initialize(conf);
URI pathAndScheme = tempFolder.newFile().toURI();
assertNotNull(pathAndScheme.getScheme());
FileSystem fs = FileSystem.get(pathAndScheme);
assertEquals("file", fs.getUri().getScheme());
}
}
代码示例来源:origin: apache/flink
/**
* Writes the given job manager address to the associated configuration object.
*
* @param address Address to write to the configuration
* @param config The configuration to write to
*/
static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
config.setString(JobManagerOptions.ADDRESS, address.getHostString());
config.setInteger(JobManagerOptions.PORT, address.getPort());
config.setString(RestOptions.ADDRESS, address.getHostString());
config.setInteger(RestOptions.PORT, address.getPort());
}
代码示例来源:origin: apache/flink
@Test
public void testMultipleYarnShipOptions() throws Exception {
final String[] args = new String[]{"run", "--yarnship", tmp.newFolder().getAbsolutePath(), "--yarnship", tmp.newFolder().getAbsolutePath()};
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
new Configuration(),
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
AbstractYarnClusterDescriptor flinkYarnDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);
assertEquals(2, flinkYarnDescriptor.shipFiles.size());
}
代码示例来源:origin: apache/flink
@Test
public void testZookeeperNamespaceProperty() throws Exception {
String zkNamespaceCliInput = "flink_test_namespace";
String[] params = new String[] {"-yn", "2", "-yz", zkNamespaceCliInput};
FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
new Configuration(),
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
}
代码示例来源:origin: apache/flink
/**
* Ensure that the user can pass a custom configuration object to the LocalEnvironment.
*/
@Test
public void testLocalEnvironmentWithConfig() throws Exception {
Configuration conf = new Configuration();
conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
env.getConfig().disableSysoutLogging();
DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
.rebalance()
.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
@Override
public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
out.collect(getRuntimeContext().getIndexOfThisSubtask());
}
});
List<Integer> resultCollection = result.collect();
assertEquals(PARALLELISM, resultCollection.size());
}
代码示例来源:origin: apache/flink
public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) throws IOException, FlinkException {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(numFinishedPolls);
if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) {
LOG.info("Enabling SSL for the history server.");
try {
webAddress = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS);
webPort = config.getInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT);
webRefreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_WEB_REFRESH_INTERVAL);
String webDirectory = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR);
if (webDirectory == null) {
webDirectory = System.getProperty("java.io.tmpdir") + File.separator + "flink-web-history-" + UUID.randomUUID();
String refreshDirectories = config.getString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS);
if (refreshDirectories == null) {
throw new FlinkException(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS + " was not configured.");
for (String refreshDirectory : refreshDirectories.split(",")) {
try {
Path refreshPath = WebMonitorUtils.validateAndNormalizeUri(new Path(refreshDirectory).toUri());
FileSystem refreshFS = refreshPath.getFileSystem();
refreshDirs.add(new RefreshLocation(refreshPath, refreshFS));
} catch (Exception e) {
long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls);
代码示例来源:origin: apache/flink
@Test
public void testYarnIDOverridesPropertiesFile() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
configuration,
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);
assertEquals(TEST_YARN_APPLICATION_ID_2, clusterId);
}
内容来源于网络,如有侵权,请联系作者删除!