org.apache.flink.configuration.Configuration类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(13.6k)|赞(0)|评价(0)|浏览(768)

本文整理了Java中org.apache.flink.configuration.Configuration类的一些代码示例,展示了Configuration类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration类的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration

Configuration介绍

[英]Lightweight configuration object which stores key/value pairs.
[中]存储键/值对的轻量级配置对象。

代码示例

代码示例来源:origin: apache/flink

public PythonPlanBinder(Configuration globalConfig) {
  String configuredPlanTmpPath = globalConfig.getString(PythonOptions.PLAN_TMP_DIR);
  tmpPlanFilesDir = configuredPlanTmpPath != null
    ? configuredPlanTmpPath
    : System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID();
  operatorConfig = new Configuration();
  operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
  String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR);
  if (configuredTmpDataDir != null) {
    operatorConfig.setString(PythonOptions.DATA_TMP_DIR, configuredTmpDataDir);
  }
  operatorConfig.setLong(PythonOptions.MMAP_FILE_SIZE, globalConfig.getLong(PythonOptions.MMAP_FILE_SIZE));
}

代码示例来源:origin: apache/flink

public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration,
    List<URL> jarFiles, List<URL> globalClasspaths) {
  this.clientConfiguration = clientConfiguration;
  this.jarFiles = jarFiles;
  this.globalClasspaths = globalClasspaths;
  clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
  clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
  clientConfiguration.setInteger(RestOptions.PORT, inet.getPort());
}

代码示例来源:origin: apache/flink

private void createBroadcastVariable(PythonOperationInfo info) {
  UdfOperator<?> op1 = (UdfOperator<?>) sets.getDataSet(info.parentID);
  DataSet<?> op2 = sets.getDataSet(info.otherID);
  op1.withBroadcastSet(op2, info.name);
  Configuration c = op1.getParameters();
  if (c == null) {
    c = new Configuration();
  }
  int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
  c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1);
  c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, info.name);
  op1.withParameters(c);
}

代码示例来源:origin: apache/flink

@Override
public Configuration clone() {
  Configuration config = new Configuration();
  config.addAll(this);
  return config;
}

代码示例来源:origin: apache/flink

@Override
public String getClusterDescription() {
  String host = config.getString(JobManagerOptions.ADDRESS, "");
  int port = config.getInteger(JobManagerOptions.PORT, -1);
  return "Standalone cluster at " + host + ":" + port;
}

代码示例来源:origin: apache/flink

public JobGraphGenerator(Configuration config) {
  this.defaultMaxFan = config.getInteger(AlgorithmOptions.SPILLING_MAX_FAN);
  this.defaultSortSpillingThreshold = config.getFloat(AlgorithmOptions.SORT_SPILLING_THRESHOLD);
  this.useLargeRecordHandler = config.getBoolean(
      ConfigConstants.USE_LARGE_RECORD_HANDLER_KEY,
      ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER);
}

代码示例来源:origin: apache/flink

@Test
public void testExplicitlySetToLocal() throws Exception {
  final Configuration conf = new Configuration();
  conf.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, LocalFileSystem.getLocalFsURI().toString());
  FileSystem.initialize(conf);
  URI justPath = new URI(tempFolder.newFile().toURI().getPath());
  assertNull(justPath.getScheme());
  FileSystem fs = FileSystem.get(justPath);
  assertEquals("file", fs.getUri().getScheme());
}

代码示例来源:origin: apache/flink

@Test
public void testPojoTypeWithPartialFieldInCSV() throws Exception {
  File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
  tempFile.deleteOnExit();
  tempFile.setWritable(true);
  OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
  wrt.write("123,NODATA,AAA,NODATA,3.123,BBB\n");
  wrt.write("456,NODATA,BBB,NODATA,1.123,AAA\n");
  wrt.close();
  @SuppressWarnings("unchecked")
  PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
  CsvInputFormat<PojoItem> inputFormat = new PojoCsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo, new boolean[]{true, false, true, false, true, true});
  inputFormat.configure(new Configuration());
  FileInputSplit[] splits = inputFormat.createInputSplits(1);
  inputFormat.open(splits[0]);
  validatePojoItem(inputFormat);
}

代码示例来源:origin: apache/flink

public PojoSerializerUpgradeTest(String backendType) throws IOException, DynamicCodeLoadingException {
  Configuration config = new Configuration();
  config.setString(CheckpointingOptions.STATE_BACKEND, backendType);
  config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toURI().toString());
  stateBackend = StateBackendLoader.loadStateBackendFromConfig(config, Thread.currentThread().getContextClassLoader(), null);
}

代码示例来源:origin: apache/flink

/**
 * Test with one nested directory and recursive.file.enumeration = true
 */
@Test
public void testOneNestedDirectoryTrue() {
  try {
    String firstLevelDir = TestFileUtils.randomFileName();
    String secondLevelDir = TestFileUtils.randomFileName();
    File insideNestedDir = tempFolder.newFolder(firstLevelDir, secondLevelDir);
    File nestedDir = insideNestedDir.getParentFile();
    // create a file in the first-level and two files in the nested dir
    TestFileUtils.createTempFileInDirectory(nestedDir.getAbsolutePath(), "paella");
    TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "kalamari");
    TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "fideua");
    this.format.setFilePath(new Path(nestedDir.toURI().toString()));
    this.config.setBoolean("recursive.file.enumeration", true);
    format.configure(this.config);
    FileInputSplit[] splits = format.createInputSplits(1);
    Assert.assertEquals(3, splits.length);
  } catch (Exception ex) {
    ex.printStackTrace();
    Assert.fail(ex.getMessage());
  }
}

代码示例来源:origin: apache/flink

protected Configuration createClusterConfig() throws IOException {
  TemporaryFolder temporaryFolder = new TemporaryFolder();
  temporaryFolder.create();
  final File haDir = temporaryFolder.newFolder();
  Configuration config = new Configuration();
  config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
  // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
  config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
  config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
  if (zkServer != null) {
    config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
    config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
  }
  return config;
}

代码示例来源:origin: apache/flink

@Before
public void setup() {
  testingFatalErrorHandler = new TestingFatalErrorHandler();
  flinkConfig = new Configuration();
  flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
  File root = folder.getRoot();
  File home = new File(root, "home");
  boolean created = home.mkdir();
  assertTrue(created);
  env = new HashMap<>();
  env.put(ENV_APP_ID, "foo");
  env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
  env.put(ENV_CLIENT_SHIP_FILES, "");
  env.put(ENV_FLINK_CLASSPATH, "");
  env.put(ENV_HADOOP_USER_NAME, "foo");
  env.put(FLINK_JAR_PATH, root.toURI().toString());
}

代码示例来源:origin: apache/flink

/**
 * Ensure that the program parallelism can be set even if the configuration is supplied.
 */
@Test
public void testUserSpecificParallelism() throws Exception {
  Configuration config = new Configuration();
  config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
  final URI restAddress = MINI_CLUSTER_RESOURCE.getMiniCluster().getRestAddress();
  final String hostname = restAddress.getHost();
  final int port = restAddress.getPort();
  final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
      hostname,
      port,
      config
  );
  env.setParallelism(USER_DOP);
  env.getConfig().disableSysoutLogging();
  DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
      .rebalance()
      .mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
        @Override
        public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
          out.collect(getRuntimeContext().getIndexOfThisSubtask());
        }
      });
  List<Integer> resultCollection = result.collect();
  assertEquals(USER_DOP, resultCollection.size());
}

代码示例来源:origin: apache/flink

@Test
  public void testExplicitlyPathTakesPrecedence() throws Exception {
    final Configuration conf = new Configuration();
    conf.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "otherFS://localhost:1234/");
    FileSystem.initialize(conf);

    URI pathAndScheme = tempFolder.newFile().toURI();
    assertNotNull(pathAndScheme.getScheme());

    FileSystem fs = FileSystem.get(pathAndScheme);
    assertEquals("file", fs.getUri().getScheme());
  }
}

代码示例来源:origin: apache/flink

/**
 * Writes the given job manager address to the associated configuration object.
 *
 * @param address Address to write to the configuration
 * @param config The configuration to write to
 */
static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
  config.setString(JobManagerOptions.ADDRESS, address.getHostString());
  config.setInteger(JobManagerOptions.PORT, address.getPort());
  config.setString(RestOptions.ADDRESS, address.getHostString());
  config.setInteger(RestOptions.PORT, address.getPort());
}

代码示例来源:origin: apache/flink

@Test
public void testMultipleYarnShipOptions() throws Exception {
  final String[] args = new String[]{"run", "--yarnship", tmp.newFolder().getAbsolutePath(), "--yarnship", tmp.newFolder().getAbsolutePath()};
  final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
    new Configuration(),
    tmp.getRoot().getAbsolutePath(),
    "y",
    "yarn");
  final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
  AbstractYarnClusterDescriptor flinkYarnDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);
  assertEquals(2, flinkYarnDescriptor.shipFiles.size());
}

代码示例来源:origin: apache/flink

@Test
public void testZookeeperNamespaceProperty() throws Exception {
  String zkNamespaceCliInput = "flink_test_namespace";
  String[] params = new String[] {"-yn", "2", "-yz", zkNamespaceCliInput};
  FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
    new Configuration(),
    tmp.getRoot().getAbsolutePath(),
    "y",
    "yarn");
  CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
  AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
  assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
}

代码示例来源:origin: apache/flink

/**
 * Ensure that the user can pass a custom configuration object to the LocalEnvironment.
 */
@Test
public void testLocalEnvironmentWithConfig() throws Exception {
  Configuration conf = new Configuration();
  conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);
  final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
  env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
  env.getConfig().disableSysoutLogging();
  DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
      .rebalance()
      .mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
        @Override
        public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
          out.collect(getRuntimeContext().getIndexOfThisSubtask());
        }
      });
  List<Integer> resultCollection = result.collect();
  assertEquals(PARALLELISM, resultCollection.size());
}

代码示例来源:origin: apache/flink

public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) throws IOException, FlinkException {
  Preconditions.checkNotNull(config);
  Preconditions.checkNotNull(numFinishedPolls);
  if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) {
    LOG.info("Enabling SSL for the history server.");
    try {
  webAddress = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS);
  webPort = config.getInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT);
  webRefreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_WEB_REFRESH_INTERVAL);
  String webDirectory = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR);
  if (webDirectory == null) {
    webDirectory = System.getProperty("java.io.tmpdir") + File.separator + "flink-web-history-" + UUID.randomUUID();
  String refreshDirectories = config.getString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS);
  if (refreshDirectories == null) {
    throw new FlinkException(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS + " was not configured.");
  for (String refreshDirectory : refreshDirectories.split(",")) {
    try {
      Path refreshPath = WebMonitorUtils.validateAndNormalizeUri(new Path(refreshDirectory).toUri());
      FileSystem refreshFS = refreshPath.getFileSystem();
      refreshDirs.add(new RefreshLocation(refreshPath, refreshFS));
    } catch (Exception e) {
  long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
  archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls);

代码示例来源:origin: apache/flink

@Test
public void testYarnIDOverridesPropertiesFile() throws Exception {
  File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
  final Configuration configuration = new Configuration();
  configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
  final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
    configuration,
    tmp.getRoot().getAbsolutePath(),
    "y",
    "yarn");
  final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
  final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);
  assertEquals(TEST_YARN_APPLICATION_ID_2, clusterId);
}

相关文章