org.apache.flink.runtime.execution.Environment.getIOManager()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(166)

本文整理了Java中org.apache.flink.runtime.execution.Environment.getIOManager()方法的一些代码示例,展示了Environment.getIOManager()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getIOManager()方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getIOManager

Environment.getIOManager介绍

[英]Returns the current IOManager.
[中]返回当前IOManager。

代码示例

代码示例来源:origin: apache/flink

initializedDbBasePaths = env.getIOManager().getSpillingDirectories();

代码示例来源:origin: apache/flink

configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),

代码示例来源:origin: apache/flink

@Override
public void init() throws Exception {
  StreamConfig configuration = getConfiguration();
  TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  int numberOfInputs = configuration.getNumberOfInputs();
  if (numberOfInputs > 0) {
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<>(
        inputGates,
        inSerializer,
        this,
        configuration.getCheckpointMode(),
        getCheckpointLock(),
        getEnvironment().getIOManager(),
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getStreamStatusMaintainer(),
        this.headOperator,
        getEnvironment().getMetricGroup().getIOMetricGroup(),
        inputWatermarkGauge);
  }
  headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  // wrap watermark gauge since registered metrics must be unique
  getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}

代码示例来源:origin: org.apache.flink/flink-runtime

@Override
public IOManager getIOManager() {
  return getEnvironment().getIOManager();
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

@Override
public IOManager getIOManager() {
  return getEnvironment().getIOManager();
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

@Override
public IOManager getIOManager() {
  return getEnvironment().getIOManager();
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

@Override
public IOManager getIOManager() {
  return getEnvironment().getIOManager();
}

代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb_2.11

initializedDbBasePaths = env.getIOManager().getSpillingDirectories();

代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb

initializedDbBasePaths = env.getIOManager().getSpillingDirectories();

代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb_2.10

initializedDbBasePaths = env.getIOManager().getSpillingDirectories();

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),

代码示例来源:origin: com.alibaba.blink/flink-table

@Override
public void open() throws Exception {
  super.open();
  isFinished1 = false;
  isFinished2 = false;
  collector = new StreamRecordCollector<>(output);
  serializer1 = (AbstractRowSerializer) getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
  serializer2 = (AbstractRowSerializer) getOperatorConfig().getTypeSerializerIn2(getUserCodeClassloader());
  leftNullRow = new GenericRow(serializer1.getNumFields());
  rightNullRow = new GenericRow(serializer2.getNumFields());
  joinedRow = new JoinedRow();
  CookedClasses classes = cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
  condFunc = classes.condFuncClass.newInstance();
  keyComparator = classes.keyComparatorClass.newInstance();
  keyComparator.init(keyGSorter.serializers(), keyGSorter.comparators());
  memManager = getContainingTask().getEnvironment().getMemoryManager();
  ioManager = getContainingTask().getEnvironment().getIOManager();
  Projection<BaseRow, BinaryRow> projection1 = classes.projectionClass1.newInstance();
  Projection<BaseRow, BinaryRow> projection2 = classes.projectionClass2.newInstance();
  int pageNum1 = (int) (leftBufferMemory / memManager.getPageSize());
  List<MemorySegment> mem1 = memManager.allocatePages(getContainingTask(), pageNum1);
  buffer1 = new WrappedBuffer(mem1, serializer1, projection1);
  int pageNum2 = (int) (rightBufferMemory / memManager.getPageSize());
  List<MemorySegment> mem2 = memManager.allocatePages(getContainingTask(), pageNum2);
  buffer2 = new WrappedBuffer(mem2, serializer2, projection2);
  initGauge();
  initJoin();
}

代码示例来源:origin: com.alibaba.blink/flink-table

@Override
public void open() throws Exception {
  super.open();
  LOG.info("Opening SortOperator");
  cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
  TypeSerializer<BaseRow> inputSerializer = getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
  this.binarySerializer =
      new BinaryRowSerializer(((AbstractRowSerializer) inputSerializer).getTypes());
  MemoryManager memManager = this.getContainingTask().getEnvironment().getMemoryManager();
  IOManager ioManager = this.getContainingTask().getEnvironment().getIOManager();
  NormalizedKeyComputer computer = computerClass.newInstance();
  RecordComparator comparator = comparatorClass.newInstance();
  computer.init(gSorter.serializers(), gSorter.comparators());
  comparator.init(gSorter.serializers(), gSorter.comparators());
  this.sorter = new BinaryExternalSorter(this.getContainingTask(), memManager, reservedMemorySize,
      maxMemorySize, perRequestMemorySize, ioManager, inputSerializer, binarySerializer,
      computer, comparator, getSqlConf());
  this.sorter.startThreads();
  gSorter = null;
  collector = new StreamRecordCollector<>(output);
  //register the the metrics.
  getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
  getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
  getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

@Override
public void init() throws Exception {
  StreamConfig configuration = getConfiguration();
  TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  int numberOfInputs = configuration.getNumberOfInputs();
  if (numberOfInputs > 0) {
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<>(
        inputGates,
        inSerializer,
        this,
        configuration.getCheckpointMode(),
        getCheckpointLock(),
        getEnvironment().getIOManager(),
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getStreamStatusMaintainer(),
        this.headOperator);
    // make sure that stream tasks report their I/O statistics
    inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
  }
}

代码示例来源:origin: com.alibaba.blink/flink-table

IOManager ioManager = getContainingTask().getEnvironment().getIOManager();

代码示例来源:origin: com.alibaba.blink/flink-table

@Override
public void open() throws Exception {
  super.open();
  isFinished1 = false;
  isFinished2 = false;
  classes = cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
  Collector<BaseRow> collector = new StreamRecordCollector<>(output);
  inputSerializer1 = (AbstractRowSerializer) getOperatorConfig()
    .getTypeSerializerIn1(getUserCodeClassloader());
  serializer1 = new BinaryRowSerializer(inputSerializer1.getTypes());
  inputSerializer2 = (AbstractRowSerializer) getOperatorConfig()
    .getTypeSerializerIn2(getUserCodeClassloader());
  serializer2 = new BinaryRowSerializer(inputSerializer2.getTypes());
  memManager = this.getContainingTask().getEnvironment().getMemoryManager();
  ioManager = this.getContainingTask().getEnvironment().getIOManager();
  JoinConditionFunction condFunc = classes.condFuncClass.newInstance();
  leftNullRow = new GenericRow(serializer1.getNumFields());
  rightNullRow = new GenericRow(serializer2.getNumFields());
  JoinedRow joinedRow = new JoinedRow();
  helper =
    new SortMergeJoinHelper(collector, condFunc, leftNullRow, rightNullRow, joinedRow);
  initSorter();
  probeBuffer =
    newBuffer(probeBufferMemory, leftNeedsSort ? inputSerializer2 : inputSerializer1);
  initGauge();
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

@Override
public void init() throws Exception {
  StreamConfig configuration = getConfiguration();
  TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  int numberOfInputs = configuration.getNumberOfInputs();
  if (numberOfInputs > 0) {
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<>(
        inputGates,
        inSerializer,
        this,
        configuration.getCheckpointMode(),
        getCheckpointLock(),
        getEnvironment().getIOManager(),
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getStreamStatusMaintainer(),
        this.headOperator,
        getEnvironment().getMetricGroup().getIOMetricGroup(),
        inputWatermarkGauge);
  }
  headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  // wrap watermark gauge since registered metrics must be unique
  getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

@Override
public void init() throws Exception {
  StreamConfig configuration = getConfiguration();
  TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  int numberOfInputs = configuration.getNumberOfInputs();
  if (numberOfInputs > 0) {
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<>(
        inputGates,
        inSerializer,
        this,
        configuration.getCheckpointMode(),
        getCheckpointLock(),
        getEnvironment().getIOManager(),
        getEnvironment().getTaskManagerInfo().getConfiguration(),
        getStreamStatusMaintainer(),
        this.headOperator,
        getEnvironment().getMetricGroup().getIOMetricGroup(),
        inputWatermarkGauge);
  }
  headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  // wrap watermark gauge since registered metrics must be unique
  getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}

代码示例来源:origin: com.alibaba.blink/flink-table

this.ioManager = this.getContainingTask().getEnvironment().getIOManager();

相关文章