org.apache.flink.configuration.Configuration.setLong()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(10.2k)|赞(0)|评价(0)|浏览(221)

本文整理了Java中org.apache.flink.configuration.Configuration.setLong()方法的一些代码示例,展示了Configuration.setLong()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.setLong()方法的具体详情如下:
包路径:org.apache.flink.configuration.Configuration
类名称:Configuration
方法名:setLong

Configuration.setLong介绍

[英]Adds the given key/value pair to the configuration object.
[中]将给定的键/值对添加到配置对象。

代码示例

代码示例来源:origin: apache/flink

@Override
public void setLong(String key, long value) {
  this.backingConfig.setLong(this.prefix + key, value);
}

代码示例来源:origin: apache/flink

public void setBufferTimeout(long timeout) {
  config.setLong(BUFFER_TIMEOUT, timeout);
}

代码示例来源:origin: apache/flink

public void setIterationWaitTime(long time) {
  config.setLong(ITERATON_WAIT, time);
}

代码示例来源:origin: apache/flink

@Override
public void setLong(ConfigOption<Long> key, long value) {
  this.backingConfig.setLong(prefix + key.key(), value);
}

代码示例来源:origin: apache/flink

public PythonPlanBinder(Configuration globalConfig) {
  String configuredPlanTmpPath = globalConfig.getString(PythonOptions.PLAN_TMP_DIR);
  tmpPlanFilesDir = configuredPlanTmpPath != null
    ? configuredPlanTmpPath
    : System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID();
  operatorConfig = new Configuration();
  operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
  String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR);
  if (configuredTmpDataDir != null) {
    operatorConfig.setString(PythonOptions.DATA_TMP_DIR, configuredTmpDataDir);
  }
  operatorConfig.setLong(PythonOptions.MMAP_FILE_SIZE, globalConfig.getLong(PythonOptions.MMAP_FILE_SIZE));
}

代码示例来源:origin: apache/flink

@Before
public void setUp() throws Exception {
  final Configuration config = new Configuration();
  config.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 1);
  config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 2);
  config.setLong(RestOptions.RETRY_DELAY, 3);
  restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(config);
}

代码示例来源:origin: apache/flink

private static Configuration getConfiguration() {
  Configuration config = new Configuration();
  config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
  config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);
  return config;
}

代码示例来源:origin: apache/flink

/**
 * Write out the tuples in a temporary file and return it.
 */
@Before
public void writeTuples() throws IOException {
  this.tempFile = File.createTempFile("BinaryInputFormat", null);
  this.tempFile.deleteOnExit();
  Configuration configuration = new Configuration();
  configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
  if (this.parallelism == 1) {
    BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI().toString(),
        configuration);
    for (int index = 0; index < this.numberOfTuples; index++) {
      output.writeRecord(this.getRecord(index));
    }
    output.close();
  } else {
    this.tempFile.delete();
    this.tempFile.mkdir();
    int recordIndex = 0;
    for (int fileIndex = 0; fileIndex < this.parallelism; fileIndex++) {
      BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI() + "/" +
          (fileIndex + 1), configuration);
      for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
        output.writeRecord(this.getRecord(recordIndex));
      }
      output.close();
    }
  }
}

代码示例来源:origin: apache/flink

config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(RestOptions.PORT, 0);
config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L);
config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());

代码示例来源:origin: apache/flink

taskInfo.addAllResources(portResources);
Iterator<String> portsToAssign = tmPortKeys.iterator();
rangeValues(portResources).forEach(port -> dynamicProperties.setLong(portsToAssign.next(), port));
if (portsToAssign.hasNext()) {
  throw new IllegalArgumentException("insufficient # of ports assigned");

代码示例来源:origin: apache/flink

@Test
public void testCreateInputSplitsWithOneFile() throws IOException {
  // create temporary file with 3 blocks
  final File tempFile = File.createTempFile("binary_input_format_test", "tmp");
  tempFile.deleteOnExit();
  final int blockInfoSize = new BlockInfo().getInfoSize();
  final int blockSize = blockInfoSize + 8;
  final int numBlocks = 3;
  FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
  for(int i = 0; i < blockSize * numBlocks; i++) {
    fileOutputStream.write(new byte[]{1});
  }
  fileOutputStream.close();
  final Configuration config = new Configuration();
  config.setLong("input.block_size", blockSize + 10);
  final BinaryInputFormat<Record> inputFormat = new MyBinaryInputFormat();
  inputFormat.setFilePath(tempFile.toURI().toString());
  inputFormat.setBlockSize(blockSize);
  
  inputFormat.configure(config);
  
  FileInputSplit[] inputSplits = inputFormat.createInputSplits(numBlocks);
  
  Assert.assertEquals("Returns requested numbers of splits.", numBlocks, inputSplits.length);
  Assert.assertEquals("1. split has block size length.", blockSize, inputSplits[0].getLength());
  Assert.assertEquals("2. split has block size length.", blockSize, inputSplits[1].getLength());
  Assert.assertEquals("3. split has block size length.", blockSize, inputSplits[2].getLength());
}

代码示例来源:origin: apache/flink

@Before
public void setUp() throws Exception {
  final Configuration clientConfig = new Configuration();
  clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
  clientConfig.setLong(RestOptions.RETRY_DELAY, 0);
  clientConfig.addAll(CLUSTER.getClientConfiguration());
  client = new RestClusterClient<>(
    clientConfig,
    StandaloneClusterId.getInstance()
  );
}

代码示例来源:origin: apache/flink

private void executeSchedulingTest(Configuration configuration) throws Exception {
  configuration.setInteger(RestOptions.PORT, 0);
  final long slotIdleTimeout = 50L;
  configuration.setLong(JobManagerOptions.SLOT_IDLE_TIMEOUT, slotIdleTimeout);
  final int parallelism = 4;
  final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
    .setConfiguration(configuration)
    .setNumTaskManagers(parallelism)
    .setNumSlotsPerTaskManager(1)
    .build();
  try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) {
    miniCluster.start();
    MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster);
    JobGraph jobGraph = createJobGraph(slotIdleTimeout << 1, parallelism);
    CompletableFuture<JobSubmissionResult> submissionFuture = miniClusterClient.submitJob(jobGraph);
    // wait for the submission to succeed
    JobSubmissionResult jobSubmissionResult = submissionFuture.get();
    CompletableFuture<JobResult> resultFuture = miniClusterClient.requestJobResult(jobSubmissionResult.getJobID());
    JobResult jobResult = resultFuture.get();
    assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
  }
}

代码示例来源:origin: apache/flink

/**
 * Verifies that latency metrics can be enabled via the configuration.
 */
@Test
public void testLatencyMarkEmissionEnabledViaFlinkConfig() throws Exception {
  testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
    Configuration tmConfig = new Configuration();
    tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, latencyMarkInterval);
    Environment env = MockEnvironment.builder()
      .setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
      .build();
    setupSourceOperator(operator, new ExecutionConfig(), env, timeProvider);
  });
}

代码示例来源:origin: apache/flink

/**
 * Verifies that latency metrics can be enabled via the {@link ExecutionConfig} even if they are disabled via
 * the configuration.
 */
@Test
public void testLatencyMarkEmissionEnabledOverrideViaExecutionConfig() throws Exception {
  testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
    ExecutionConfig executionConfig = new ExecutionConfig();
    executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
    Configuration tmConfig = new Configuration();
    tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, 0L);
    Environment env = MockEnvironment.builder()
      .setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
      .build();
    setupSourceOperator(operator, executionConfig, env, timeProvider);
  });
}

代码示例来源:origin: apache/flink

/**
 * Verifies that latency metrics can be disabled via the {@link ExecutionConfig} even if they are enabled via
 * the configuration.
 */
@Test
public void testLatencyMarkEmissionDisabledOverrideViaExecutionConfig() throws Exception {
  testLatencyMarkEmission(0, (operator, timeProvider) -> {
    Configuration tmConfig = new Configuration();
    tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, latencyMarkInterval);
    Environment env = MockEnvironment.builder()
      .setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
      .build();
    ExecutionConfig executionConfig = new ExecutionConfig();
    executionConfig.setLatencyTrackingInterval(0);
    setupSourceOperator(operator, executionConfig, env, timeProvider);
  });
}

代码示例来源:origin: apache/flink

orig.setString("mykey", "myvalue");
orig.setInteger("mynumber", 100);
orig.setLong("longvalue", 478236947162389746L);
orig.setFloat("PI", 3.1415926f);
orig.setDouble("E", Math.E);

代码示例来源:origin: apache/flink

pc.setLong("long", 15);
pc.setLong("too_long", TOO_LONG);
pc.setFloat("float", 2.1456775f);
pc.setDouble("double", Math.PI);

代码示例来源:origin: uber/AthenaX

static LocalFlinkMiniCluster execute(LocalStreamEnvironment env,
                    Configuration conf, String jobName) throws Exception {
  StreamGraph streamGraph = env.getStreamGraph();
  streamGraph.setJobName(jobName);
  JobGraph jobGraph = streamGraph.getJobGraph();
  Configuration configuration = new Configuration(conf);
  configuration.addAll(jobGraph.getJobConfiguration());
  configuration.setLong("taskmanager.memory.size", -1L);
  configuration.setInteger("taskmanager.numberOfTaskSlots", jobGraph.getMaximumParallelism());

  LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, true);
  cluster.start();
  cluster.submitJobDetached(jobGraph);
  return cluster;
 }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

public Configuration generateConfiguration() {
  Configuration newConfiguration = new Configuration(config);
  // set the memory
  long memory = getOrCalculateManagedMemoryPerTaskManager();
  newConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memory);
  return newConfiguration;
}

相关文章