org.apache.flink.runtime.execution.Environment.getTaskStateManager()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.3k)|赞(0)|评价(0)|浏览(184)

本文整理了Java中org.apache.flink.runtime.execution.Environment.getTaskStateManager()方法的一些代码示例,展示了Environment.getTaskStateManager()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getTaskStateManager()方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getTaskStateManager

Environment.getTaskStateManager介绍

暂无

代码示例

代码示例来源:origin: apache/flink

  1. public StreamTaskStateInitializerImpl(
  2. Environment environment,
  3. StateBackend stateBackend,
  4. ProcessingTimeService processingTimeService) {
  5. this.environment = environment;
  6. this.taskStateManager = Preconditions.checkNotNull(environment.getTaskStateManager());
  7. this.stateBackend = Preconditions.checkNotNull(stateBackend);
  8. this.processingTimeService = processingTimeService;
  9. }

代码示例来源:origin: apache/flink

  1. private void reportCompletedSnapshotStates(
  2. TaskStateSnapshot acknowledgedTaskStateSnapshot,
  3. TaskStateSnapshot localTaskStateSnapshot,
  4. long asyncDurationMillis) {
  5. TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
  6. boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
  7. boolean hasLocalState = localTaskStateSnapshot.hasState();
  8. Preconditions.checkState(hasAckState || !hasLocalState,
  9. "Found cached state but no corresponding primary state is reported to the job " +
  10. "manager. This indicates a problem.");
  11. // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
  12. // to stateless tasks on restore. This enables simple job modifications that only concern
  13. // stateless without the need to assign them uids to match their (always empty) states.
  14. taskStateManager.reportTaskStateSnapshots(
  15. checkpointMetaData,
  16. checkpointMetrics,
  17. hasAckState ? acknowledgedTaskStateSnapshot : null,
  18. hasLocalState ? localTaskStateSnapshot : null);
  19. LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
  20. owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
  21. LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
  22. owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
  23. }

代码示例来源:origin: apache/flink

  1. env.getTaskStateManager().createLocalRecoveryConfig();

代码示例来源:origin: apache/flink

  1. checkpointResponder);
  2. when(mockEnvironment.getTaskStateManager()).thenReturn(taskStateManager);

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

  1. public StreamTaskStateInitializerImpl(
  2. Environment environment,
  3. StateBackend stateBackend,
  4. ProcessingTimeService processingTimeService) {
  5. this.environment = environment;
  6. this.taskStateManager = Preconditions.checkNotNull(environment.getTaskStateManager());
  7. this.stateBackend = Preconditions.checkNotNull(stateBackend);
  8. this.processingTimeService = processingTimeService;
  9. }

代码示例来源:origin: org.apache.flink/flink-streaming-java

  1. public StreamTaskStateInitializerImpl(
  2. Environment environment,
  3. StateBackend stateBackend,
  4. ProcessingTimeService processingTimeService) {
  5. this.environment = environment;
  6. this.taskStateManager = Preconditions.checkNotNull(environment.getTaskStateManager());
  7. this.stateBackend = Preconditions.checkNotNull(stateBackend);
  8. this.processingTimeService = processingTimeService;
  9. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. @Override
  2. public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
  3. Environment env,
  4. JobID jobID,
  5. String operatorIdentifier,
  6. TypeSerializer<K> keySerializer,
  7. int numberOfKeyGroups,
  8. KeyGroupRange keyGroupRange,
  9. TaskKvStateRegistry kvStateRegistry) {
  10. TaskStateManager taskStateManager = env.getTaskStateManager();
  11. return new HeapKeyedStateBackend<>(
  12. kvStateRegistry,
  13. keySerializer,
  14. env.getUserClassLoader(),
  15. numberOfKeyGroups,
  16. keyGroupRange,
  17. isUsingAsynchronousSnapshots(),
  18. env.getExecutionConfig(),
  19. taskStateManager.createLocalRecoveryConfig());
  20. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. @Override
  2. public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
  3. Environment env,
  4. JobID jobID,
  5. String operatorIdentifier,
  6. TypeSerializer<K> keySerializer,
  7. int numberOfKeyGroups,
  8. KeyGroupRange keyGroupRange,
  9. TaskKvStateRegistry kvStateRegistry) {
  10. TaskStateManager taskStateManager = env.getTaskStateManager();
  11. LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
  12. return new HeapKeyedStateBackend<>(
  13. kvStateRegistry,
  14. keySerializer,
  15. env.getUserClassLoader(),
  16. numberOfKeyGroups,
  17. keyGroupRange,
  18. isUsingAsynchronousSnapshots(),
  19. env.getExecutionConfig(),
  20. localRecoveryConfig);
  21. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. @Override
  2. public AbstractInternalStateBackend createInternalStateBackend(
  3. Environment env,
  4. String operatorIdentifier,
  5. int numberOfGroups,
  6. KeyGroupRange keyGroupRange) {
  7. return new HeapInternalStateBackend(
  8. numberOfGroups,
  9. keyGroupRange,
  10. env.getUserClassLoader(),
  11. env.getTaskStateManager().createLocalRecoveryConfig(),
  12. env.getTaskKvStateRegistry(),
  13. isUsingAsynchronousSnapshots(),
  14. env.getExecutionConfig()
  15. );
  16. }

代码示例来源:origin: org.apache.flink/flink-streaming-java

  1. private void reportCompletedSnapshotStates(
  2. TaskStateSnapshot acknowledgedTaskStateSnapshot,
  3. TaskStateSnapshot localTaskStateSnapshot,
  4. long asyncDurationMillis) {
  5. TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
  6. boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
  7. boolean hasLocalState = localTaskStateSnapshot.hasState();
  8. Preconditions.checkState(hasAckState || !hasLocalState,
  9. "Found cached state but no corresponding primary state is reported to the job " +
  10. "manager. This indicates a problem.");
  11. // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
  12. // to stateless tasks on restore. This enables simple job modifications that only concern
  13. // stateless without the need to assign them uids to match their (always empty) states.
  14. taskStateManager.reportTaskStateSnapshots(
  15. checkpointMetaData,
  16. checkpointMetrics,
  17. hasAckState ? acknowledgedTaskStateSnapshot : null,
  18. hasLocalState ? localTaskStateSnapshot : null);
  19. LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
  20. owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
  21. LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
  22. owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
  23. }

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

  1. private void reportCompletedSnapshotStates(
  2. TaskStateSnapshot acknowledgedTaskStateSnapshot,
  3. TaskStateSnapshot localTaskStateSnapshot,
  4. long asyncDurationMillis) {
  5. TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
  6. boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
  7. boolean hasLocalState = localTaskStateSnapshot.hasState();
  8. Preconditions.checkState(hasAckState || !hasLocalState,
  9. "Found cached state but no corresponding primary state is reported to the job " +
  10. "manager. This indicates a problem.");
  11. // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
  12. // to stateless tasks on restore. This enables simple job modifications that only concern
  13. // stateless without the need to assign them uids to match their (always empty) states.
  14. taskStateManager.reportTaskStateSnapshots(
  15. checkpointMetaData,
  16. checkpointMetrics,
  17. hasAckState ? acknowledgedTaskStateSnapshot : null,
  18. hasLocalState ? localTaskStateSnapshot : null);
  19. LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
  20. owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
  21. LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
  22. owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
  23. }

代码示例来源:origin: com.alibaba.blink/flink-runtime

  1. @Override
  2. public AbstractInternalStateBackend createInternalStateBackend(
  3. Environment env,
  4. String operatorIdentifier,
  5. int numberOfGroups,
  6. KeyGroupRange keyGroupRange) {
  7. return new HeapInternalStateBackend(
  8. numberOfGroups,
  9. keyGroupRange,
  10. env.getUserClassLoader(),
  11. env.getTaskStateManager().createLocalRecoveryConfig(),
  12. env.getTaskKvStateRegistry(),
  13. isUsingAsynchronousSnapshots(),
  14. env.getExecutionConfig()
  15. );
  16. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

  1. @Override
  2. public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
  3. Environment env,
  4. JobID jobID,
  5. String operatorIdentifier,
  6. TypeSerializer<K> keySerializer,
  7. int numberOfKeyGroups,
  8. KeyGroupRange keyGroupRange,
  9. TaskKvStateRegistry kvStateRegistry,
  10. TtlTimeProvider ttlTimeProvider,
  11. MetricGroup metricGroup) {
  12. TaskStateManager taskStateManager = env.getTaskStateManager();
  13. HeapPriorityQueueSetFactory priorityQueueSetFactory =
  14. new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
  15. return new HeapKeyedStateBackend<>(
  16. kvStateRegistry,
  17. keySerializer,
  18. env.getUserClassLoader(),
  19. numberOfKeyGroups,
  20. keyGroupRange,
  21. isUsingAsynchronousSnapshots(),
  22. env.getExecutionConfig(),
  23. taskStateManager.createLocalRecoveryConfig(),
  24. priorityQueueSetFactory,
  25. ttlTimeProvider);
  26. }

代码示例来源:origin: org.apache.flink/flink-runtime

  1. @Override
  2. public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
  3. Environment env,
  4. JobID jobID,
  5. String operatorIdentifier,
  6. TypeSerializer<K> keySerializer,
  7. int numberOfKeyGroups,
  8. KeyGroupRange keyGroupRange,
  9. TaskKvStateRegistry kvStateRegistry,
  10. TtlTimeProvider ttlTimeProvider,
  11. MetricGroup metricGroup) {
  12. TaskStateManager taskStateManager = env.getTaskStateManager();
  13. HeapPriorityQueueSetFactory priorityQueueSetFactory =
  14. new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
  15. return new HeapKeyedStateBackend<>(
  16. kvStateRegistry,
  17. keySerializer,
  18. env.getUserClassLoader(),
  19. numberOfKeyGroups,
  20. keyGroupRange,
  21. isUsingAsynchronousSnapshots(),
  22. env.getExecutionConfig(),
  23. taskStateManager.createLocalRecoveryConfig(),
  24. priorityQueueSetFactory,
  25. ttlTimeProvider);
  26. }

代码示例来源:origin: org.apache.flink/flink-runtime

  1. @Override
  2. public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
  3. Environment env,
  4. JobID jobID,
  5. String operatorIdentifier,
  6. TypeSerializer<K> keySerializer,
  7. int numberOfKeyGroups,
  8. KeyGroupRange keyGroupRange,
  9. TaskKvStateRegistry kvStateRegistry,
  10. TtlTimeProvider ttlTimeProvider,
  11. MetricGroup metricGroup) {
  12. TaskStateManager taskStateManager = env.getTaskStateManager();
  13. LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
  14. HeapPriorityQueueSetFactory priorityQueueSetFactory =
  15. new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
  16. return new HeapKeyedStateBackend<>(
  17. kvStateRegistry,
  18. keySerializer,
  19. env.getUserClassLoader(),
  20. numberOfKeyGroups,
  21. keyGroupRange,
  22. isUsingAsynchronousSnapshots(),
  23. env.getExecutionConfig(),
  24. localRecoveryConfig,
  25. priorityQueueSetFactory,
  26. ttlTimeProvider);
  27. }

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

  1. @Override
  2. public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
  3. Environment env,
  4. JobID jobID,
  5. String operatorIdentifier,
  6. TypeSerializer<K> keySerializer,
  7. int numberOfKeyGroups,
  8. KeyGroupRange keyGroupRange,
  9. TaskKvStateRegistry kvStateRegistry,
  10. TtlTimeProvider ttlTimeProvider,
  11. MetricGroup metricGroup) {
  12. TaskStateManager taskStateManager = env.getTaskStateManager();
  13. LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
  14. HeapPriorityQueueSetFactory priorityQueueSetFactory =
  15. new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
  16. return new HeapKeyedStateBackend<>(
  17. kvStateRegistry,
  18. keySerializer,
  19. env.getUserClassLoader(),
  20. numberOfKeyGroups,
  21. keyGroupRange,
  22. isUsingAsynchronousSnapshots(),
  23. env.getExecutionConfig(),
  24. localRecoveryConfig,
  25. priorityQueueSetFactory,
  26. ttlTimeProvider);
  27. }

代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb_2.11

  1. env.getTaskStateManager().createLocalRecoveryConfig();

代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb

  1. env.getTaskStateManager().createLocalRecoveryConfig();

相关文章