org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(187)

本文整理了Java中org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()方法的一些代码示例,展示了Environment.getTaskManagerInfo()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getTaskManagerInfo()方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getTaskManagerInfo

Environment.getTaskManagerInfo介绍

[英]Gets the task manager info, with configuration and hostname.
[中]获取任务管理器信息,包括配置和主机名。

代码示例

代码示例来源:origin: apache/flink

  1. private void tryShutdownTimerService() {
  2. if (timerService != null && !timerService.isTerminated()) {
  3. try {
  4. final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
  5. getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
  6. if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
  7. LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
  8. "timers. Will continue with shutdown procedure.", timeoutMs);
  9. }
  10. } catch (Throwable t) {
  11. // catch and log the exception to not replace the original exception
  12. LOG.error("Could not shut down timer service", t);
  13. }
  14. }
  15. }

代码示例来源:origin: apache/flink

  1. private StateBackend createStateBackend() throws Exception {
  2. final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
  3. return StateBackendLoader.fromApplicationOrConfigOrDefault(
  4. fromApplication,
  5. getEnvironment().getTaskManagerInfo().getConfiguration(),
  6. getUserCodeClassLoader(),
  7. LOG);
  8. }

代码示例来源:origin: apache/flink

  1. String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
  2. ensureRocksDBIsLoaded(tempDir);

代码示例来源:origin: apache/flink

  1. getCheckpointLock(),
  2. getEnvironment().getIOManager(),
  3. getEnvironment().getTaskManagerInfo().getConfiguration(),
  4. getStreamStatusMaintainer(),
  5. this.headOperator,

代码示例来源:origin: apache/flink

  1. @Override
  2. public void invoke() throws Exception {
  3. RecordReader<SpeedTestRecord> reader = new RecordReader<>(
  4. getEnvironment().getInputGate(0),
  5. SpeedTestRecord.class,
  6. getEnvironment().getTaskManagerInfo().getTmpDirectories());
  7. try {
  8. boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
  9. int numRecords = 0;
  10. while (reader.next() != null) {
  11. if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
  12. Thread.sleep(IS_SLOW_SLEEP_MS);
  13. }
  14. }
  15. }
  16. finally {
  17. reader.clearBuffers();
  18. }
  19. }
  20. }

代码示例来源:origin: apache/flink

  1. Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration();
  2. int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
  3. if (historySize <= 0) {

代码示例来源:origin: apache/flink

  1. @Override
  2. public void init() throws Exception {
  3. StreamConfig configuration = getConfiguration();
  4. TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  5. int numberOfInputs = configuration.getNumberOfInputs();
  6. if (numberOfInputs > 0) {
  7. InputGate[] inputGates = getEnvironment().getAllInputGates();
  8. inputProcessor = new StreamInputProcessor<>(
  9. inputGates,
  10. inSerializer,
  11. this,
  12. configuration.getCheckpointMode(),
  13. getCheckpointLock(),
  14. getEnvironment().getIOManager(),
  15. getEnvironment().getTaskManagerInfo().getConfiguration(),
  16. getStreamStatusMaintainer(),
  17. this.headOperator,
  18. getEnvironment().getMetricGroup().getIOMetricGroup(),
  19. inputWatermarkGauge);
  20. }
  21. headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  22. // wrap watermark gauge since registered metrics must be unique
  23. getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
  24. }

代码示例来源:origin: apache/flink

  1. @Override
  2. public void invoke() throws Exception {
  3. RecordReader<SpeedTestRecord> reader = new RecordReader<>(
  4. getEnvironment().getInputGate(0),
  5. SpeedTestRecord.class,
  6. getEnvironment().getTaskManagerInfo().getTmpDirectories());
  7. RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
  8. try {
  9. SpeedTestRecord record;
  10. while ((record = reader.next()) != null) {
  11. writer.emit(record);
  12. }
  13. }
  14. finally {
  15. reader.clearBuffers();
  16. writer.clearBuffers();
  17. writer.flushAll();
  18. }
  19. }
  20. }

代码示例来源:origin: apache/flink

  1. final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
  2. final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
  3. ? getExecutionConfig().getLatencyTrackingInterval()

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. @Override
  2. public TaskManagerRuntimeInfo getTaskManagerInfo() {
  3. return getEnvironment().getTaskManagerInfo();
  4. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

  1. @Override
  2. public TaskManagerRuntimeInfo getTaskManagerInfo() {
  3. return getEnvironment().getTaskManagerInfo();
  4. }

代码示例来源:origin: org.apache.flink/flink-runtime

  1. @Override
  2. public TaskManagerRuntimeInfo getTaskManagerInfo() {
  3. return getEnvironment().getTaskManagerInfo();
  4. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. @Override
  2. public TaskManagerRuntimeInfo getTaskManagerInfo() {
  3. return getEnvironment().getTaskManagerInfo();
  4. }

代码示例来源:origin: org.apache.flink/flink-streaming-java

  1. private void tryShutdownTimerService() {
  2. if (timerService != null && !timerService.isTerminated()) {
  3. try {
  4. final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
  5. getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
  6. if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
  7. LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
  8. "timers. Will continue with shutdown procedure.", timeoutMs);
  9. }
  10. } catch (Throwable t) {
  11. // catch and log the exception to not replace the original exception
  12. LOG.error("Could not shut down timer service", t);
  13. }
  14. }
  15. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

  1. private void tryShutdownTimerService() {
  2. if (timerService != null && !timerService.isTerminated()) {
  3. try {
  4. final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
  5. getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
  6. if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
  7. LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
  8. "timers. Will continue with shutdown procedure.", timeoutMs);
  9. }
  10. } catch (Throwable t) {
  11. // catch and log the exception to not replace the original exception
  12. LOG.error("Could not shut down timer service", t);
  13. }
  14. }
  15. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

  1. private StateBackend createStateBackend() throws Exception {
  2. final StateBackend fromJob = configuration.getStateBackend(getUserCodeClassLoader());
  3. if (fromJob != null) {
  4. // backend has been configured on the environment
  5. LOG.info("Using user-defined state backend: {}.", fromJob);
  6. return fromJob;
  7. }
  8. else {
  9. return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(
  10. getEnvironment().getTaskManagerInfo().getConfiguration(),
  11. getUserCodeClassLoader(),
  12. LOG);
  13. }
  14. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

  1. private StateBackend createStateBackend() throws Exception {
  2. final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
  3. return StateBackendLoader.fromApplicationOrConfigOrDefault(
  4. fromApplication,
  5. getEnvironment().getTaskManagerInfo().getConfiguration(),
  6. getUserCodeClassLoader(),
  7. LOG);
  8. }

代码示例来源:origin: org.apache.flink/flink-streaming-java

  1. private StateBackend createStateBackend() throws Exception {
  2. final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
  3. return StateBackendLoader.fromApplicationOrConfigOrDefault(
  4. fromApplication,
  5. getEnvironment().getTaskManagerInfo().getConfiguration(),
  6. getUserCodeClassLoader(),
  7. LOG);
  8. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

  1. @Override
  2. public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
  3. this.container = containingTask;
  4. this.config = config;
  5. this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
  6. this.output = new CountingOutput(output, ((OperatorMetricGroup) this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
  7. if (config.isChainStart()) {
  8. ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseInputMetricsForTask();
  9. }
  10. if (config.isChainEnd()) {
  11. ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
  12. }
  13. Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
  14. int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
  15. if (historySize <= 0) {
  16. LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
  17. historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
  18. }
  19. latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));
  20. this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
  21. stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
  22. stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
  23. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

  1. @Override
  2. public void init() throws Exception {
  3. StreamConfig configuration = getConfiguration();
  4. TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  5. int numberOfInputs = configuration.getNumberOfInputs();
  6. if (numberOfInputs > 0) {
  7. InputGate[] inputGates = getEnvironment().getAllInputGates();
  8. inputProcessor = new StreamInputProcessor<>(
  9. inputGates,
  10. inSerializer,
  11. this,
  12. configuration.getCheckpointMode(),
  13. getCheckpointLock(),
  14. getEnvironment().getIOManager(),
  15. getEnvironment().getTaskManagerInfo().getConfiguration(),
  16. getStreamStatusMaintainer(),
  17. this.headOperator);
  18. // make sure that stream tasks report their I/O statistics
  19. inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
  20. }
  21. }

相关文章