
x33g5p2x  于2022-01-18 转载在 其他  



[英]Checks whether there is an entry for the given config option.


代码示例来源:origin: apache/flink

public boolean contains(ConfigOption<?> configOption) {
  return backingConfig.contains(prefixOption(configOption, prefix));

代码示例来源:origin: apache/flink

private File getUploadDir(Configuration configuration) {
  File baseDir = new File(configuration.getString(WebOptions.UPLOAD_DIR,
  boolean uploadDirSpecified = configuration.contains(WebOptions.UPLOAD_DIR);
  return uploadDirSpecified ? baseDir : new File(baseDir, "flink-web-" + UUID.randomUUID());

代码示例来源:origin: apache/flink

 * Creates a {@link LocalEnvironment} for local program execution that also starts the
 * web monitoring UI.
 * <p>The local execution environment will run the program in a multi-threaded fashion in
 * the same JVM as the environment was created in. It will use the parallelism specified in the
 * parameter.
 * <p>If the configuration key 'rest.port' was set in the configuration, that particular
 * port will be used for the web UI. Otherwise, the default port (8081) will be used.
public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
  checkNotNull(conf, "conf");
  conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
  if (!conf.contains(RestOptions.PORT)) {
    // explicitly set this option so that it's not set to 0 later
    conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
  return createLocalEnvironment(conf, -1);

代码示例来源:origin: apache/flink

 * Creates a {@link LocalStreamEnvironment} for local program execution that also starts the
 * web monitoring UI.
 * <p>The local execution environment will run the program in a multi-threaded fashion in
 * the same JVM as the environment was created in. It will use the parallelism specified in the
 * parameter.
 * <p>If the configuration key 'rest.port' was set in the configuration, that particular
 * port will be used for the web UI. Otherwise, the default port (8081) will be used.
public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
  checkNotNull(conf, "conf");
  conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
  if (!conf.contains(RestOptions.PORT)) {
    // explicitly set this option so that it's not set to 0 later
    conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
  return createLocalEnvironment(defaultLocalParallelism, conf);

代码示例来源:origin: apache/flink

private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
  if (!configuration.contains(RestOptions.PORT)) {
    configuration.setInteger(RestOptions.PORT, 0);
  final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
        TaskManagerOptions.NUM_TASK_SLOTS, 1))
  final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
  configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
  return miniCluster;

代码示例来源:origin: apache/flink

Protos.Credential.Builder credential = null;
if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
  throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured.");
if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
  if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {

代码示例来源:origin: apache/flink

if (!configuration.contains(RestOptions.PORT)) {
  configuration.setInteger(RestOptions.PORT, 0);

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

 * Returns whether the new network buffer memory configuration is present in the configuration
 * object, i.e. at least one new parameter is given or the old one is not present.
 * @param config configuration object
 * @return <tt>true</tt> if the new configuration method is used, <tt>false</tt> otherwise
public static boolean hasNewNetworkBufConf(final Configuration config) {
  return config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
    config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
    config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX) ||

代码示例来源:origin: org.apache.flink/flink-runtime

 * Returns whether the new network buffer memory configuration is present in the configuration
 * object, i.e. at least one new parameter is given or the old one is not present.
 * @param config configuration object
 * @return <tt>true</tt> if the new configuration method is used, <tt>false</tt> otherwise
public static boolean hasNewNetworkBufConf(final Configuration config) {
  return config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
    config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
    config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX) ||

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

 * Returns whether the new network buffer memory configuration is present in the configuration
 * object, i.e. at least one new parameter is given or the old one is not present.
 * @param config configuration object
 * @return <tt>true</tt> if the new configuration method is used, <tt>false</tt> otherwise
public static boolean hasNewNetworkBufConf(final Configuration config) {
  return config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
    config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
    config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX) ||


 * Returns whether the new network buffer memory configuration is present in the configuration
 * object, i.e. at least one new parameter is given or the old one is not present.
 * @param config configuration object
 * @return <tt>true</tt> if the new configuration method is used, <tt>false</tt> otherwise
public static boolean hasNewNetworkBufConf(final Configuration config) {
  return config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
    config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
    config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX) ||


public boolean contains(ConfigOption<?> configOption) {
  return backingConfig.contains(prefixOption(configOption, prefix));

代码示例来源:origin: org.apache.flink/flink-core

public boolean contains(ConfigOption<?> configOption) {
  return backingConfig.contains(prefixOption(configOption, prefix));

代码示例来源:origin: org.apache.flink/flink-runtime-web_2.11

private File getUploadDir(Configuration configuration) {
  File baseDir = new File(configuration.getString(WebOptions.UPLOAD_DIR,
  boolean uploadDirSpecified = configuration.contains(WebOptions.UPLOAD_DIR);
  return uploadDirSpecified ? baseDir : new File(baseDir, "flink-web-" + UUID.randomUUID());

代码示例来源:origin: org.apache.flink/flink-runtime-web

private File getUploadDir(Configuration configuration) {
  File baseDir = new File(configuration.getString(WebOptions.UPLOAD_DIR,
  boolean uploadDirSpecified = configuration.contains(WebOptions.UPLOAD_DIR);
  return uploadDirSpecified ? baseDir : new File(baseDir, "flink-web-" + UUID.randomUUID());

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

 * Set temporary configuration directories if necessary.
 * @param configuration flink config to patch
 * @param defaultDirs in case no tmp directories is set, next directories will be applied
public static void updateTmpDirectoriesInConfiguration(
    Configuration configuration,
    @Nullable String defaultDirs) {
  if (configuration.contains(CoreOptions.TMP_DIRS)) {"Overriding Fink's temporary file directories with those " +
      "specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS));
  } else if (defaultDirs != null) {"Setting directories for temporary files to: {}", defaultDirs);
    configuration.setString(CoreOptions.TMP_DIRS, defaultDirs);
    configuration.setBoolean(USE_LOCAL_DEFAULT_TMP_DIRS, true);

代码示例来源:origin: org.apache.flink/flink-runtime

 * Set temporary configuration directories if necessary.
 * @param configuration flink config to patch
 * @param defaultDirs in case no tmp directories is set, next directories will be applied
public static void updateTmpDirectoriesInConfiguration(
    Configuration configuration,
    @Nullable String defaultDirs) {
  if (configuration.contains(CoreOptions.TMP_DIRS)) {"Overriding Fink's temporary file directories with those " +
      "specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS));
  } else if (defaultDirs != null) {"Setting directories for temporary files to: {}", defaultDirs);
    configuration.setString(CoreOptions.TMP_DIRS, defaultDirs);
    configuration.setBoolean(USE_LOCAL_DEFAULT_TMP_DIRS, true);

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

private static Time getSlotRequestTimeout(final Configuration configuration) {
    final long slotRequestTimeoutMs;
    if (configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) {
      LOGGER.warn("Config key {} is deprecated; use {} instead.",
      slotRequestTimeoutMs = configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT);
    } else {
      slotRequestTimeoutMs = configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT);
    return Time.milliseconds(slotRequestTimeoutMs);

代码示例来源:origin: org.apache.flink/flink-runtime

private static Time getSlotRequestTimeout(final Configuration configuration) {
    final long slotRequestTimeoutMs;
    if (configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) {
      LOGGER.warn("Config key {} is deprecated; use {} instead.",
      slotRequestTimeoutMs = configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT);
    } else {
      slotRequestTimeoutMs = configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT);
    return Time.milliseconds(slotRequestTimeoutMs);

代码示例来源:origin: DTStack/flinkx

 * Creates a {@link LocalEnvironment} for local program execution that also starts the
 * web monitoring UI.
 * <p>The local execution environment will run the program in a multi-threaded fashion in
 * the same JVM as the environment was created in. It will use the parallelism specified in the
 * parameter.
 * <p>If the configuration key 'rest.port' was set in the configuration, that particular
 * port will be used for the web UI. Otherwise, the default port (8081) will be used.
public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
  checkNotNull(conf, "conf");
  conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
  if (!conf.contains(RestOptions.PORT)) {
    // explicitly set this option so that it's not set to 0 later
    conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
  return createLocalEnvironment(conf, -1);
