本文整理了Java中org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()
方法的一些代码示例,展示了Environment.getTaskManagerInfo()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getTaskManagerInfo()
方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getTaskManagerInfo
[英]Gets the task manager info, with configuration and hostname.
[中]获取任务管理器信息,包括配置和主机名。
代码示例来源:origin: apache/flink
private void tryShutdownTimerService() {
if (timerService != null && !timerService.isTerminated()) {
try {
final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
"timers. Will continue with shutdown procedure.", timeoutMs);
}
} catch (Throwable t) {
// catch and log the exception to not replace the original exception
LOG.error("Could not shut down timer service", t);
}
}
}
代码示例来源:origin: apache/flink
private StateBackend createStateBackend() throws Exception {
final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
return StateBackendLoader.fromApplicationOrConfigOrDefault(
fromApplication,
getEnvironment().getTaskManagerInfo().getConfiguration(),
getUserCodeClassLoader(),
LOG);
}
代码示例来源:origin: apache/flink
String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0];
ensureRocksDBIsLoaded(tempDir);
代码示例来源:origin: apache/flink
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,
代码示例来源:origin: apache/flink
@Override
public void invoke() throws Exception {
RecordReader<SpeedTestRecord> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
SpeedTestRecord.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());
try {
boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
int numRecords = 0;
while (reader.next() != null) {
if (isSlow && (numRecords++ % IS_SLOW_EVERY_NUM_RECORDS) == 0) {
Thread.sleep(IS_SLOW_SLEEP_MS);
}
}
}
finally {
reader.clearBuffers();
}
}
}
代码示例来源:origin: apache/flink
Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration();
int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
if (historySize <= 0) {
代码示例来源:origin: apache/flink
@Override
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<>(
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}
代码示例来源:origin: apache/flink
@Override
public void invoke() throws Exception {
RecordReader<SpeedTestRecord> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
SpeedTestRecord.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());
RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
try {
SpeedTestRecord record;
while ((record = reader.next()) != null) {
writer.emit(record);
}
}
finally {
reader.clearBuffers();
writer.clearBuffers();
writer.flushAll();
}
}
}
代码示例来源:origin: apache/flink
final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return getEnvironment().getTaskManagerInfo();
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return getEnvironment().getTaskManagerInfo();
}
代码示例来源:origin: org.apache.flink/flink-runtime
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return getEnvironment().getTaskManagerInfo();
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return getEnvironment().getTaskManagerInfo();
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
private void tryShutdownTimerService() {
if (timerService != null && !timerService.isTerminated()) {
try {
final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
"timers. Will continue with shutdown procedure.", timeoutMs);
}
} catch (Throwable t) {
// catch and log the exception to not replace the original exception
LOG.error("Could not shut down timer service", t);
}
}
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
private void tryShutdownTimerService() {
if (timerService != null && !timerService.isTerminated()) {
try {
final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
"timers. Will continue with shutdown procedure.", timeoutMs);
}
} catch (Throwable t) {
// catch and log the exception to not replace the original exception
LOG.error("Could not shut down timer service", t);
}
}
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
private StateBackend createStateBackend() throws Exception {
final StateBackend fromJob = configuration.getStateBackend(getUserCodeClassLoader());
if (fromJob != null) {
// backend has been configured on the environment
LOG.info("Using user-defined state backend: {}.", fromJob);
return fromJob;
}
else {
return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(
getEnvironment().getTaskManagerInfo().getConfiguration(),
getUserCodeClassLoader(),
LOG);
}
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
private StateBackend createStateBackend() throws Exception {
final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
return StateBackendLoader.fromApplicationOrConfigOrDefault(
fromApplication,
getEnvironment().getTaskManagerInfo().getConfiguration(),
getUserCodeClassLoader(),
LOG);
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
private StateBackend createStateBackend() throws Exception {
final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
return StateBackendLoader.fromApplicationOrConfigOrDefault(
fromApplication,
getEnvironment().getTaskManagerInfo().getConfiguration(),
getUserCodeClassLoader(),
LOG);
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
this.container = containingTask;
this.config = config;
this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
this.output = new CountingOutput(output, ((OperatorMetricGroup) this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
if (config.isChainStart()) {
((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseInputMetricsForTask();
}
if (config.isChainEnd()) {
((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
}
Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
if (historySize <= 0) {
LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
}
latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));
this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
@Override
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<>(
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator);
// make sure that stream tasks report their I/O statistics
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
}
}
内容来源于网络,如有侵权,请联系作者删除!