本文整理了Java中org.apache.flink.runtime.execution.Environment.getMetricGroup()
方法的一些代码示例,展示了Environment.getMetricGroup()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getMetricGroup()
方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getMetricGroup
[英]Returns the task specific metric group.
[中]返回特定于任务的度量值组。
代码示例来源:origin: apache/flink
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
StreamEdge edge,
int outputIndex,
Environment environment,
String taskName,
long bufferTimeout) {
@SuppressWarnings("unchecked")
StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
RecordWriter.createRecordWriter(bufferWriter, outputPartitioner, bufferTimeout, taskName);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
}
代码示例来源:origin: apache/flink
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
input1WatermarkGauge,
input2WatermarkGauge);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue);
代码示例来源:origin: apache/flink
@Override
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<>(
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}
代码示例来源:origin: apache/flink
this.config = config;
try {
OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
if (config.isChainStart()) {
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
public DistributedRuntimeUDFContext createRuntimeContext() {
Environment env = getEnvironment();
String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
getEnvironment().getMetricGroup().addOperator(sourceName));
}
}
代码示例来源:origin: org.apache.flink/flink-runtime
public DistributedRuntimeUDFContext createRuntimeContext() {
Environment env = getEnvironment();
String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
}
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
public DistributedRuntimeUDFContext createRuntimeContext() {
Environment env = getEnvironment();
String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry(),
getEnvironment().getMetricGroup().addOperator(sourceName));
}
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
public DistributedRuntimeUDFContext createRuntimeContext() {
Environment env = getEnvironment();
String sourceName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
}
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
private static <OUT> StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> createStreamRecordWriter(
StreamEdge edge,
int outputIndex,
Environment environment,
String taskName,
long bufferTimeout) {
@SuppressWarnings("unchecked")
StreamPartitioner<OUT> outputPartitioner = (StreamPartitioner<OUT>) edge.getPartitioner();
LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
new StreamRecordWriter<>(bufferWriter, outputPartitioner, bufferTimeout, taskName);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
public DistributedRuntimeUDFContext createRuntimeContext() {
Environment env = getEnvironment();
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry(),
getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
}
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
public DistributedRuntimeUDFContext createRuntimeContext() {
Environment env = getEnvironment();
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
}
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
public DistributedRuntimeUDFContext createRuntimeContext() {
Environment env = getEnvironment();
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
}
}
代码示例来源:origin: org.apache.flink/flink-runtime
public DistributedRuntimeUDFContext createRuntimeContext() {
Environment env = getEnvironment();
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
}
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
@Override
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<>(
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
@Override
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<>(
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
AbstractAccumulatorRegistry accumulatorRegistry)
{
this.config = config;
this.taskName = taskName;
this.userCodeClassLoader = userCodeClassLoader;
this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
Environment env = parent.getEnvironment();
if (parent instanceof BatchTask) {
this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(metrics);
} else {
this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader,
parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorRegistry, metrics);
}
this.executionConfig = executionConfig;
this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
setup(parent);
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
Map<String, Accumulator<?,?>> accumulatorMap)
{
this.config = config;
this.taskName = taskName;
this.userCodeClassLoader = userCodeClassLoader;
this.metrics = parent.getEnvironment().getMetricGroup().getOrAddOperator(taskName);
this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
Environment env = parent.getEnvironment();
if (parent instanceof BatchTask) {
this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(metrics);
} else {
this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader,
parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorMap, metrics
);
}
this.executionConfig = executionConfig;
this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
setup(parent);
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
Map<String, Accumulator<?,?>> accumulatorMap)
{
this.config = config;
this.taskName = taskName;
this.userCodeClassLoader = userCodeClassLoader;
this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
Environment env = parent.getEnvironment();
if (parent instanceof BatchTask) {
this.udfContext = ((BatchTask<?, ?>) parent).createRuntimeContext(metrics);
} else {
this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader,
parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorMap, metrics
);
}
this.executionConfig = executionConfig;
this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
setup(parent);
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
@Override
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<>(
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator);
// make sure that stream tasks report their I/O statistics
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
}
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
this.container = containingTask;
this.config = config;
this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
this.output = new CountingOutput(output, ((OperatorMetricGroup) this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
if (config.isChainStart()) {
((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseInputMetricsForTask();
}
if (config.isChainEnd()) {
((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
}
Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
if (historySize <= 0) {
LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
}
latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));
this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
}
内容来源于网络,如有侵权,请联系作者删除!