org.apache.flink.runtime.execution.Environment类的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(13.9k)|赞(0)|评价(0)|浏览(233)

本文整理了Java中org.apache.flink.runtime.execution.Environment类的一些代码示例,展示了Environment类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment类的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment

Environment介绍

[英]The Environment gives the code executed in a task access to the task's properties (such as name, parallelism), the configurations, the data stream readers and writers, as well as the various components that are provided by the TaskManager, such as memory manager, I/O manager, ...
[中]环境允许任务中执行的代码访问任务的属性(如名称、并行性)、配置、数据流读写器以及TaskManager提供的各种组件,如内存管理器、I/O管理器等。。。

代码示例

代码示例来源:origin: apache/flink

@Override
public void init() throws Exception {
  StreamConfig configuration = getConfiguration();
  TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  int numberOfInputs = configuration.getNumberOfInputs();
  if (numberOfInputs > 0) {
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<>(
        inputGates,
        inSerializer,
        this,
        configuration.getCheckpointMode(),
        getCheckpointLock(),
        getEnvironment().getIOManager(),
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getStreamStatusMaintainer(),
        this.headOperator,
        getEnvironment().getMetricGroup().getIOMetricGroup(),
        inputWatermarkGauge);
  }
  headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  // wrap watermark gauge since registered metrics must be unique
  getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}

代码示例来源:origin: apache/flink

public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
                Environment env, Map<String, Accumulator<?, ?>> accumulators) {
  super(env.getTaskInfo(),
      env.getUserClassLoader(),
      operator.getExecutionConfig(),
      accumulators,
      env.getDistributedCacheEntries(),
      operator.getMetricGroup());
  this.operator = operator;
  this.taskEnvironment = env;
  this.streamConfig = new StreamConfig(env.getTaskConfiguration());
  this.operatorUniqueID = operator.getOperatorID().toString();
}

代码示例来源:origin: apache/flink

@Override
public void init() throws Exception {
  final String iterationId = getConfiguration().getIterationId();
  if (iterationId == null || iterationId.length() == 0) {
    throw new Exception("Missing iteration ID in the task configuration");
  }
  final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
      getEnvironment().getTaskInfo().getIndexOfThisSubtask());
  final long iterationWaitTime = getConfiguration().getIterationWaitTime();
  LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
  @SuppressWarnings("unchecked")
  BlockingQueue<StreamRecord<IN>> dataChannel =
      (BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
  LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
  this.headOperator = new RecordPusher<>();
  this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
  // call super.init() last because that needs this.headOperator to be set up
  super.init();
}

代码示例来源:origin: apache/flink

@Override
public OperatorStateBackend createOperatorStateBackend(
    Environment env,
    String operatorIdentifier) throws Exception {
  //the default for RocksDB; eventually there can be a operator state backend based on RocksDB, too.
  final boolean asyncSnapshots = true;
  return new DefaultOperatorStateBackend(
      env.getUserClassLoader(),
      env.getExecutionConfig(),
      asyncSnapshots);
}

代码示例来源:origin: apache/flink

this.jobId = env.getJobID();
  initializedDbBasePaths = env.getIOManager().getSpillingDirectories();

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

public DistributedRuntimeUDFContext createRuntimeContext() {
    Environment env = getEnvironment();

    String sourceName =  getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
    sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
    return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
        getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
        getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
  }
}

代码示例来源:origin: apache/flink

InputGate reader = getEnvironment().getInputGate(i);
  switch (inputType) {
    case 1:
    configuration.getCheckpointMode(),
    getCheckpointLock(),
    getEnvironment().getIOManager(),
    getEnvironment().getTaskManagerInfo().getConfiguration(),
    getStreamStatusMaintainer(),
    this.headOperator,
    getEnvironment().getMetricGroup().getIOMetricGroup(),
    input1WatermarkGauge,
    input2WatermarkGauge);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue);

代码示例来源:origin: com.alibaba.blink/flink-runtime

getEnvironment().getInputGate(0),
      getEnvironment().getTaskManagerInfo().getTmpDirectories(),
      getEnvironment().getTaskManagerInfo().getConfiguration());
} else if (groupSize > 1){
      new UnionInputGate(getEnvironment().getAllInputGates()),
      getEnvironment().getTaskManagerInfo().getTmpDirectories(),
      getEnvironment().getTaskManagerInfo().getConfiguration());
} else {
  throw new Exception("Illegal input group size in task configuration: " + groupSize);

代码示例来源:origin: apache/flink

String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
ensureRocksDBIsLoaded(tempDir);
  env.getTaskStateManager().createLocalRecoveryConfig();
    env.getUserClassLoader(),
    instanceBasePath,
    getDbOptions(),
    numberOfKeyGroups,
    keyGroupRange,
    env.getExecutionConfig(),
    isIncrementalCheckpointsEnabled(),
    getNumberOfTransferingThreads(),

代码示例来源:origin: com.alibaba.blink/flink-runtime

@Override
public AbstractInternalStateBackend createInternalStateBackend(
  Environment env,
  String operatorIdentifier,
  int numberOfGroups,
  KeyGroupRange keyGroupRange) {
  return new HeapInternalStateBackend(
      numberOfGroups,
      keyGroupRange,
      env.getUserClassLoader(),
      env.getTaskStateManager().createLocalRecoveryConfig(),
      env.getTaskKvStateRegistry(),
      isUsingAsynchronousSnapshots(),
      env.getExecutionConfig()
    );
}

代码示例来源:origin: apache/flink

@Override
  public void invoke() throws Exception {
    RecordReader<SpeedTestRecord> reader = new RecordReader<>(
        getEnvironment().getInputGate(0),
        SpeedTestRecord.class,
        getEnvironment().getTaskManagerInfo().getTmpDirectories());
    try {
      boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
      int numRecords = 0;
      while (reader.next() != null) {
        if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
          Thread.sleep(IS_SLOW_SLEEP_MS);
        }
      }
    }
    finally {
      reader.clearBuffers();
    }
  }
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
  Environment env,
  JobID jobID,
  String operatorIdentifier,
  TypeSerializer<K> keySerializer,
  int numberOfKeyGroups,
  KeyGroupRange keyGroupRange,
  TaskKvStateRegistry kvStateRegistry) {
  TaskStateManager taskStateManager = env.getTaskStateManager();
  LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
  return new HeapKeyedStateBackend<>(
    kvStateRegistry,
    keySerializer,
    env.getUserClassLoader(),
    numberOfKeyGroups,
    keyGroupRange,
    isUsingAsynchronousSnapshots(),
    env.getExecutionConfig(),
    localRecoveryConfig);
}

代码示例来源:origin: apache/flink

private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
      StreamEdge edge,
      int outputIndex,
      Environment environment,
      String taskName,
      long bufferTimeout) {
    @SuppressWarnings("unchecked")
    StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();

    LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);

    ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);

    // we initialize the partitioner here with the number of key groups (aka max. parallelism)
    if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
      int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
      if (0 < numKeyGroups) {
        ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
      }
    }

    RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
      RecordWriter.createRecordWriter(bufferWriter, outputPartitioner, bufferTimeout, taskName);
    output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
    return output;
  }
}

代码示例来源:origin: apache/flink

@Override
  public void invoke() throws Exception {
    RecordReader<SpeedTestRecord> reader = new RecordReader<>(
        getEnvironment().getInputGate(0),
        SpeedTestRecord.class,
        getEnvironment().getTaskManagerInfo().getTmpDirectories());
    RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
    try {
      SpeedTestRecord record;
      while ((record = reader.next()) != null) {
        writer.emit(record);
      }
    }
    finally {
      reader.clearBuffers();
      writer.clearBuffers();
      writer.flushAll();
    }
  }
}

代码示例来源:origin: com.alibaba.blink/flink-table

@Override
public void open() throws Exception {
  super.open();
  LOG.info("Opening SortOperator");
  cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
  TypeSerializer<BaseRow> inputSerializer = getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
  this.binarySerializer =
      new BinaryRowSerializer(((AbstractRowSerializer) inputSerializer).getTypes());
  MemoryManager memManager = this.getContainingTask().getEnvironment().getMemoryManager();
  IOManager ioManager = this.getContainingTask().getEnvironment().getIOManager();
  NormalizedKeyComputer computer = computerClass.newInstance();
  RecordComparator comparator = comparatorClass.newInstance();
  computer.init(gSorter.serializers(), gSorter.comparators());
  comparator.init(gSorter.serializers(), gSorter.comparators());
  this.sorter = new BinaryExternalSorter(this.getContainingTask(), memManager, reservedMemorySize,
      maxMemorySize, perRequestMemorySize, ioManager, inputSerializer, binarySerializer,
      computer, comparator, getSqlConf());
  this.sorter.startThreads();
  gSorter = null;
  collector = new StreamRecordCollector<>(output);
  //register the the metrics.
  getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
  getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
  getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
}

代码示例来源:origin: apache/flink

private void tryShutdownTimerService() {
  if (timerService != null && !timerService.isTerminated()) {
    try {
      final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
        getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
      if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
        LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
          "timers. Will continue with shutdown procedure.", timeoutMs);
      }
    } catch (Throwable t) {
      // catch and log the exception to not replace the original exception
      LOG.error("Could not shut down timer service", t);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

private <T> RecordWriterOutput<T> createStreamOutput(
    StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
    Environment taskEnvironment,
    String taskName) {
  OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
  TypeSerializer outSerializer = null;
  if (edge.getOutputTag() != null) {
    // side output
    outSerializer = upStreamConfig.getTypeSerializerSideOut(
        edge.getOutputTag(), taskEnvironment.getUserClassLoader());
  } else {
    // main output
    outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
  }
  @SuppressWarnings("unchecked")
  StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
  LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);
  ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
  // we initialize the partitioner here with the number of key groups (aka max. parallelism)
  if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
    int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
    if (0 < numKeyGroups) {
      ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
    }
  }
  StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
      new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
  output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
  return new RecordWriterOutput<>(output, outSerializer, sideOutputTag, this);
}

代码示例来源:origin: apache/flink

TaskInfo taskInfo = environment.getTaskInfo();
    () -> stateBackend.createKeyedStateBackend(
      environment,
      environment.getJobID(),
      operatorIdentifierText,
      keySerializer,
      taskInfo.getMaxNumberOfParallelSubtasks(),
      keyGroupRange,
      environment.getTaskKvStateRegistry(),
      TtlTimeProvider.DEFAULT,
      metricGroup),

代码示例来源:origin: apache/flink

@VisibleForTesting
public static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
    StreamConfig configuration,
    Environment environment) {
  List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();
  List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
  Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());
  for (int i = 0; i < outEdgesInOrder.size(); i++) {
    StreamEdge edge = outEdgesInOrder.get(i);
    recordWriters.add(
      createRecordWriter(
        edge,
        i,
        environment,
        environment.getTaskInfo().getTaskName(),
        chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
  }
  return recordWriters;
}

代码示例来源:origin: com.alibaba.blink/flink-table

IOManager ioManager = getContainingTask().getEnvironment().getIOManager();
    getContainingTask().getEnvironment().getTaskConfiguration().getBoolean(
        ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
        ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
    buildSerializer, probeSerializer,
    buildProjectionClass.newInstance(), probeProjectionClass.newInstance(),
    getContainingTask().getEnvironment().getMemoryManager(),
    parameter.reservedMemorySize,
    parameter.maxMemorySize,

相关文章