org.apache.flink.runtime.execution.Environment.getMetricGroup()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(15.5k)|赞(0)|评价(0)|浏览(244)

本文整理了Java中org.apache.flink.runtime.execution.Environment.getMetricGroup()方法的一些代码示例,展示了Environment.getMetricGroup()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getMetricGroup()方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getMetricGroup

Environment.getMetricGroup介绍

[英]Returns the task specific metric group.
[中]返回特定于任务的度量值组。

代码示例

代码示例来源:origin: apache/flink

  1. private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
  2. StreamEdge edge,
  3. int outputIndex,
  4. Environment environment,
  5. String taskName,
  6. long bufferTimeout) {
  7. @SuppressWarnings("unchecked")
  8. StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
  9. LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
  10. ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
  11. // we initialize the partitioner here with the number of key groups (aka max. parallelism)
  12. if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
  13. int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
  14. if (0 < numKeyGroups) {
  15. ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
  16. }
  17. }
  18. RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
  19. RecordWriter.createRecordWriter(bufferWriter, outputPartitioner, bufferTimeout, taskName);
  20. output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
  21. return output;
  22. }
  23. }

代码示例来源:origin: apache/flink

  1. getStreamStatusMaintainer(),
  2. this.headOperator,
  3. getEnvironment().getMetricGroup().getIOMetricGroup(),
  4. input1WatermarkGauge,
  5. input2WatermarkGauge);
  6. headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge);
  7. getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue);

代码示例来源:origin: apache/flink

  1. @Override
  2. public void init() throws Exception {
  3. StreamConfig configuration = getConfiguration();
  4. TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  5. int numberOfInputs = configuration.getNumberOfInputs();
  6. if (numberOfInputs > 0) {
  7. InputGate[] inputGates = getEnvironment().getAllInputGates();
  8. inputProcessor = new StreamInputProcessor<>(
  9. inputGates,
  10. inSerializer,
  11. this,
  12. configuration.getCheckpointMode(),
  13. getCheckpointLock(),
  14. getEnvironment().getIOManager(),
  15. getEnvironment().getTaskManagerInfo().getConfiguration(),
  16. getStreamStatusMaintainer(),
  17. this.headOperator,
  18. getEnvironment().getMetricGroup().getIOMetricGroup(),
  19. inputWatermarkGauge);
  20. }
  21. headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  22. // wrap watermark gauge since registered metrics must be unique
  23. getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
  24. }

代码示例来源:origin: apache/flink

  1. this.config = config;
  2. try {
  3. OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
  4. this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
  5. if (config.isChainStart()) {

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. public DistributedRuntimeUDFContext createRuntimeContext() {
  2. Environment env = getEnvironment();
  3. String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
  4. sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
  5. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  6. getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
  7. getEnvironment().getMetricGroup().addOperator(sourceName));
  8. }
  9. }

代码示例来源:origin: org.apache.flink/flink-runtime

  1. public DistributedRuntimeUDFContext createRuntimeContext() {
  2. Environment env = getEnvironment();
  3. String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
  4. sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
  5. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  6. getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
  7. getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
  8. }
  9. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. public DistributedRuntimeUDFContext createRuntimeContext() {
  2. Environment env = getEnvironment();
  3. String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
  4. sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
  5. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  6. getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry(),
  7. getEnvironment().getMetricGroup().addOperator(sourceName));
  8. }
  9. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

  1. public DistributedRuntimeUDFContext createRuntimeContext() {
  2. Environment env = getEnvironment();
  3. String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
  4. sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
  5. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  6. getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
  7. getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
  8. }
  9. }

代码示例来源:origin: org.apache.flink/flink-streaming-java

  1. private static <OUT> StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> createStreamRecordWriter(
  2. StreamEdge edge,
  3. int outputIndex,
  4. Environment environment,
  5. String taskName,
  6. long bufferTimeout) {
  7. @SuppressWarnings("unchecked")
  8. StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
  9. LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
  10. ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
  11. // we initialize the partitioner here with the number of key groups (aka max. parallelism)
  12. if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
  13. int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
  14. if (0 < numKeyGroups) {
  15. ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
  16. }
  17. }
  18. StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
  19. new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
  20. output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
  21. return output;
  22. }
  23. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. public DistributedRuntimeUDFContext createRuntimeContext() {
  2. Environment env = getEnvironment();
  3. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  4. getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry(),
  5. getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
  6. }
  7. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. public DistributedRuntimeUDFContext createRuntimeContext() {
  2. Environment env = getEnvironment();
  3. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  4. getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
  5. getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
  6. }
  7. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

  1. public DistributedRuntimeUDFContext createRuntimeContext() {
  2. Environment env = getEnvironment();
  3. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  4. getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
  5. getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
  6. }
  7. }

代码示例来源:origin: org.apache.flink/flink-runtime

  1. public DistributedRuntimeUDFContext createRuntimeContext() {
  2. Environment env = getEnvironment();
  3. return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
  4. getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
  5. getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
  6. }
  7. }

代码示例来源:origin: org.apache.flink/flink-streaming-java

  1. @Override
  2. public void init() throws Exception {
  3. StreamConfig configuration = getConfiguration();
  4. TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  5. int numberOfInputs = configuration.getNumberOfInputs();
  6. if (numberOfInputs > 0) {
  7. InputGate[] inputGates = getEnvironment().getAllInputGates();
  8. inputProcessor = new StreamInputProcessor<>(
  9. inputGates,
  10. inSerializer,
  11. this,
  12. configuration.getCheckpointMode(),
  13. getCheckpointLock(),
  14. getEnvironment().getIOManager(),
  15. getEnvironment().getTaskManagerInfo().getConfiguration(),
  16. getStreamStatusMaintainer(),
  17. this.headOperator,
  18. getEnvironment().getMetricGroup().getIOMetricGroup(),
  19. inputWatermarkGauge);
  20. }
  21. headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  22. // wrap watermark gauge since registered metrics must be unique
  23. getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
  24. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

  1. @Override
  2. public void init() throws Exception {
  3. StreamConfig configuration = getConfiguration();
  4. TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  5. int numberOfInputs = configuration.getNumberOfInputs();
  6. if (numberOfInputs > 0) {
  7. InputGate[] inputGates = getEnvironment().getAllInputGates();
  8. inputProcessor = new StreamInputProcessor<>(
  9. inputGates,
  10. inSerializer,
  11. this,
  12. configuration.getCheckpointMode(),
  13. getCheckpointLock(),
  14. getEnvironment().getIOManager(),
  15. getEnvironment().getTaskManagerInfo().getConfiguration(),
  16. getStreamStatusMaintainer(),
  17. this.headOperator,
  18. getEnvironment().getMetricGroup().getIOMetricGroup(),
  19. inputWatermarkGauge);
  20. }
  21. headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  22. // wrap watermark gauge since registered metrics must be unique
  23. getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
  24. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
  2. AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
  3. AbstractAccumulatorRegistry accumulatorRegistry)
  4. {
  5. this.config = config;
  6. this.taskName = taskName;
  7. this.userCodeClassLoader = userCodeClassLoader;
  8. this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
  9. this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
  10. this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
  11. this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
  12. Environment env = parent.getEnvironment();
  13. if (parent instanceof BatchTask) {
  14. this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(metrics);
  15. } else {
  16. this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader,
  17. parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorRegistry, metrics);
  18. }
  19. this.executionConfig = executionConfig;
  20. this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
  21. setup(parent);
  22. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

  1. public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
  2. AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
  3. Map<String, Accumulator<?,?>> accumulatorMap)
  4. {
  5. this.config = config;
  6. this.taskName = taskName;
  7. this.userCodeClassLoader = userCodeClassLoader;
  8. this.metrics = parent.getEnvironment().getMetricGroup().getOrAddOperator(taskName);
  9. this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
  10. this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
  11. this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
  12. Environment env = parent.getEnvironment();
  13. if (parent instanceof BatchTask) {
  14. this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(metrics);
  15. } else {
  16. this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader,
  17. parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorMap, metrics
  18. );
  19. }
  20. this.executionConfig = executionConfig;
  21. this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
  22. setup(parent);
  23. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

  1. public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
  2. AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
  3. Map<String, Accumulator<?,?>> accumulatorMap)
  4. {
  5. this.config = config;
  6. this.taskName = taskName;
  7. this.userCodeClassLoader = userCodeClassLoader;
  8. this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
  9. this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
  10. this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
  11. this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
  12. Environment env = parent.getEnvironment();
  13. if (parent instanceof BatchTask) {
  14. this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(metrics);
  15. } else {
  16. this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader,
  17. parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorMap, metrics
  18. );
  19. }
  20. this.executionConfig = executionConfig;
  21. this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
  22. setup(parent);
  23. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

  1. @Override
  2. public void init() throws Exception {
  3. StreamConfig configuration = getConfiguration();
  4. TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
  5. int numberOfInputs = configuration.getNumberOfInputs();
  6. if (numberOfInputs > 0) {
  7. InputGate[] inputGates = getEnvironment().getAllInputGates();
  8. inputProcessor = new StreamInputProcessor<>(
  9. inputGates,
  10. inSerializer,
  11. this,
  12. configuration.getCheckpointMode(),
  13. getCheckpointLock(),
  14. getEnvironment().getIOManager(),
  15. getEnvironment().getTaskManagerInfo().getConfiguration(),
  16. getStreamStatusMaintainer(),
  17. this.headOperator);
  18. // make sure that stream tasks report their I/O statistics
  19. inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
  20. }
  21. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

  1. @Override
  2. public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
  3. this.container = containingTask;
  4. this.config = config;
  5. this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
  6. this.output = new CountingOutput(output, ((OperatorMetricGroup) this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
  7. if (config.isChainStart()) {
  8. ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseInputMetricsForTask();
  9. }
  10. if (config.isChainEnd()) {
  11. ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
  12. }
  13. Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
  14. int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
  15. if (historySize <= 0) {
  16. LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
  17. historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
  18. }
  19. latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));
  20. this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
  21. stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
  22. stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
  23. }

相关文章