本文整理了Java中org.apache.flink.runtime.execution.Environment.getTaskStateManager()
方法的一些代码示例,展示了Environment.getTaskStateManager()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getTaskStateManager()
方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getTaskStateManager
暂无
代码示例来源:origin: apache/flink
public StreamTaskStateInitializerImpl(
Environment environment,
StateBackend stateBackend,
ProcessingTimeService processingTimeService) {
this.environment = environment;
this.taskStateManager = Preconditions.checkNotNull(environment.getTaskStateManager());
this.stateBackend = Preconditions.checkNotNull(stateBackend);
this.processingTimeService = processingTimeService;
}
代码示例来源:origin: apache/flink
private void reportCompletedSnapshotStates(
TaskStateSnapshot acknowledgedTaskStateSnapshot,
TaskStateSnapshot localTaskStateSnapshot,
long asyncDurationMillis) {
TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
boolean hasLocalState = localTaskStateSnapshot.hasState();
Preconditions.checkState(hasAckState || !hasLocalState,
"Found cached state but no corresponding primary state is reported to the job " +
"manager. This indicates a problem.");
// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
taskStateManager.reportTaskStateSnapshots(
checkpointMetaData,
checkpointMetrics,
hasAckState ? acknowledgedTaskStateSnapshot : null,
hasLocalState ? localTaskStateSnapshot : null);
LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
}
代码示例来源:origin: apache/flink
env.getTaskStateManager().createLocalRecoveryConfig();
代码示例来源:origin: apache/flink
checkpointResponder);
when(mockEnvironment.getTaskStateManager()).thenReturn(taskStateManager);
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
public StreamTaskStateInitializerImpl(
Environment environment,
StateBackend stateBackend,
ProcessingTimeService processingTimeService) {
this.environment = environment;
this.taskStateManager = Preconditions.checkNotNull(environment.getTaskStateManager());
this.stateBackend = Preconditions.checkNotNull(stateBackend);
this.processingTimeService = processingTimeService;
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
public StreamTaskStateInitializerImpl(
Environment environment,
StateBackend stateBackend,
ProcessingTimeService processingTimeService) {
this.environment = environment;
this.taskStateManager = Preconditions.checkNotNull(environment.getTaskStateManager());
this.stateBackend = Preconditions.checkNotNull(stateBackend);
this.processingTimeService = processingTimeService;
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) {
TaskStateManager taskStateManager = env.getTaskStateManager();
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
isUsingAsynchronousSnapshots(),
env.getExecutionConfig(),
taskStateManager.createLocalRecoveryConfig());
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) {
TaskStateManager taskStateManager = env.getTaskStateManager();
LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
isUsingAsynchronousSnapshots(),
env.getExecutionConfig(),
localRecoveryConfig);
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
@Override
public AbstractInternalStateBackend createInternalStateBackend(
Environment env,
String operatorIdentifier,
int numberOfGroups,
KeyGroupRange keyGroupRange) {
return new HeapInternalStateBackend(
numberOfGroups,
keyGroupRange,
env.getUserClassLoader(),
env.getTaskStateManager().createLocalRecoveryConfig(),
env.getTaskKvStateRegistry(),
isUsingAsynchronousSnapshots(),
env.getExecutionConfig()
);
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
private void reportCompletedSnapshotStates(
TaskStateSnapshot acknowledgedTaskStateSnapshot,
TaskStateSnapshot localTaskStateSnapshot,
long asyncDurationMillis) {
TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
boolean hasLocalState = localTaskStateSnapshot.hasState();
Preconditions.checkState(hasAckState || !hasLocalState,
"Found cached state but no corresponding primary state is reported to the job " +
"manager. This indicates a problem.");
// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
taskStateManager.reportTaskStateSnapshots(
checkpointMetaData,
checkpointMetrics,
hasAckState ? acknowledgedTaskStateSnapshot : null,
hasLocalState ? localTaskStateSnapshot : null);
LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
private void reportCompletedSnapshotStates(
TaskStateSnapshot acknowledgedTaskStateSnapshot,
TaskStateSnapshot localTaskStateSnapshot,
long asyncDurationMillis) {
TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
boolean hasLocalState = localTaskStateSnapshot.hasState();
Preconditions.checkState(hasAckState || !hasLocalState,
"Found cached state but no corresponding primary state is reported to the job " +
"manager. This indicates a problem.");
// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
taskStateManager.reportTaskStateSnapshots(
checkpointMetaData,
checkpointMetrics,
hasAckState ? acknowledgedTaskStateSnapshot : null,
hasLocalState ? localTaskStateSnapshot : null);
LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
@Override
public AbstractInternalStateBackend createInternalStateBackend(
Environment env,
String operatorIdentifier,
int numberOfGroups,
KeyGroupRange keyGroupRange) {
return new HeapInternalStateBackend(
numberOfGroups,
keyGroupRange,
env.getUserClassLoader(),
env.getTaskStateManager().createLocalRecoveryConfig(),
env.getTaskKvStateRegistry(),
isUsingAsynchronousSnapshots(),
env.getExecutionConfig()
);
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup) {
TaskStateManager taskStateManager = env.getTaskStateManager();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
isUsingAsynchronousSnapshots(),
env.getExecutionConfig(),
taskStateManager.createLocalRecoveryConfig(),
priorityQueueSetFactory,
ttlTimeProvider);
}
代码示例来源:origin: org.apache.flink/flink-runtime
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup) {
TaskStateManager taskStateManager = env.getTaskStateManager();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
isUsingAsynchronousSnapshots(),
env.getExecutionConfig(),
taskStateManager.createLocalRecoveryConfig(),
priorityQueueSetFactory,
ttlTimeProvider);
}
代码示例来源:origin: org.apache.flink/flink-runtime
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup) {
TaskStateManager taskStateManager = env.getTaskStateManager();
LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
isUsingAsynchronousSnapshots(),
env.getExecutionConfig(),
localRecoveryConfig,
priorityQueueSetFactory,
ttlTimeProvider);
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup) {
TaskStateManager taskStateManager = env.getTaskStateManager();
LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
keyGroupRange,
isUsingAsynchronousSnapshots(),
env.getExecutionConfig(),
localRecoveryConfig,
priorityQueueSetFactory,
ttlTimeProvider);
}
代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb_2.11
env.getTaskStateManager().createLocalRecoveryConfig();
代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb
env.getTaskStateManager().createLocalRecoveryConfig();
内容来源于网络,如有侵权,请联系作者删除!