org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore类的使用及代码示例

x33g5p2x  于2022-02-05 转载在 其他  
字(12.7k)|赞(0)|评价(0)|浏览(313)

本文整理了Java中org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore类的一些代码示例,展示了ZooKeeperCompletedCheckpointStore类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZooKeeperCompletedCheckpointStore类的具体详情如下:
包路径:org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
类名称:ZooKeeperCompletedCheckpointStore

ZooKeeperCompletedCheckpointStore介绍

[英]CompletedCheckpointStore for JobManagers running in HighAvailabilityMode#ZOOKEEPER.

Checkpoints are added under a ZNode per job:

+----O /flink/checkpoints/<job-id>  [persistent] 
.    | 
.    +----O /flink/checkpoints/<job-id>/1 [persistent] 
.    .                                  . 
.    .                                  . 
.    .                                  . 
.    +----O /flink/checkpoints/<job-id>/N [persistent]

During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, only the latest one is used and older ones are discarded (even if the maximum number of retained checkpoints is greater than one).

If there is a network partition and multiple JobManagers run concurrent checkpoints for the same program, it is OK to take any valid successful checkpoint as long as the "history" of checkpoints is consistent. Currently, after recovery we start out with only a single checkpoint to circumvent those situations.
[中]为在HighAvailabilityMode#ZOOKEEPER中运行的作业经理提供的完整检查点存储。
在每个作业的ZNode下添加检查点:

+----O /flink/checkpoints/<job-id>  [persistent] 
.    | 
.    +----O /flink/checkpoints/<job-id>/1 [persistent] 
.    .                                  . 
.    .                                  . 
.    .                                  . 
.    +----O /flink/checkpoints/<job-id>/N [persistent]

在恢复过程中,从ZooKeeper读取最新的检查点。如果有多个检查点,则只使用最新的检查点,而丢弃较旧的检查点(即使保留的最大检查点数大于一个)。
如果存在一个网络分区,并且多个JobManager为同一个程序运行并发检查点,则只要检查点的“历史”一致,就可以接受任何有效的成功检查点。目前,在恢复后,我们开始只有一个检查点来绕过这些情况。

代码示例

代码示例来源:origin: org.apache.flink/flink-runtime

/**
 * Tries to remove the checkpoint identified by the given checkpoint id.
 *
 * @param checkpointId identifying the checkpoint to remove
 * @return true if the checkpoint could be removed
 */
private boolean tryRemove(long checkpointId) throws Exception {
  return checkpointsInZooKeeper.releaseAndTryRemove(checkpointIdToPath(checkpointId));
}

代码示例来源:origin: org.apache.flink/flink-runtime

private static CompletedCheckpoint retrieveCompletedCheckpoint(Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandlePath) throws FlinkException {
    long checkpointId = pathToCheckpointId(stateHandlePath.f1);

    LOG.info("Trying to retrieve checkpoint {}.", checkpointId);

    try {
      return stateHandlePath.f0.retrieveState();
    } catch (ClassNotFoundException cnfe) {
      throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state handle under " +
        stateHandlePath.f1 + ". This indicates that you are trying to recover from state written by an " +
        "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe);
    } catch (IOException ioe) {
      throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state handle under " +
        stateHandlePath.f1 + ". This indicates that the retrieved state handle is broken. Try cleaning the " +
        "state handle store.", ioe);
    }
  }
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

/**
 * Synchronously writes the new checkpoints to ZooKeeper and asynchronously removes older ones.
 *
 * @param checkpoint Completed checkpoint to add.
 */
@Override
public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception {
  checkNotNull(checkpoint, "Checkpoint");
  final String path = checkpointIdToPath(checkpoint.getCheckpointID());
  // Now add the new one. If it fails, we don't want to loose existing data.
  checkpointsInZooKeeper.addAndLock(path, checkpoint);
  completedCheckpoints.addLast(checkpoint);
  // Everything worked, let's remove a previous checkpoint if necessary.
  while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
    try {
      removeSubsumed(completedCheckpoints.removeFirst());
    } catch (Exception e) {
      LOG.warn("Failed to subsume the old checkpoint", e);
    }
  }
  LOG.debug("Added {} to {}.", checkpoint, path);
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

/**
 * Synchronously writes the new checkpoints to ZooKeeper and asynchronously removes older ones.
 *
 * @param checkpoint Completed checkpoint to add.
 */
@Override
public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception {
  checkNotNull(checkpoint, "Checkpoint");
  final String path = checkpointIdToPath(checkpoint.getCheckpointID());
  // Now add the new one. If it fails, we don't want to loose existing data.
  checkpointsInZooKeeper.addAndLock(path, checkpoint);
  completedCheckpoints.addLast(checkpoint);
  // Everything worked, let's remove a previous checkpoint if necessary.
  while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
    final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
    tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume);
  }
  LOG.debug("Added {} to {}.", checkpoint, path);
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

return new ZooKeeperCompletedCheckpointStore(
  maxNumberOfCheckpointsToRetain,
  client,

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) {
  retrievedCheckpoints.add(completedCheckpoint);

代码示例来源:origin: org.apache.flink/flink-runtime

private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback) {
  try {
    if (tryRemove(completedCheckpoint.getCheckpointID())) {
      executor.execute(() -> {
        try {
          discardCallback.accept(completedCheckpoint);
        } catch (Exception e) {
          LOG.warn("Could not discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
        }
      });
    }
  } catch (Exception e) {
    LOG.warn("Failed to subsume the old checkpoint", e);
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

@Override
public void shutdown(JobStatus jobStatus) throws Exception {
  if (jobStatus.isGloballyTerminalState()) {
    LOG.info("Shutting down");
    for (CompletedCheckpoint checkpoint : completedCheckpoints) {
      tryRemoveCompletedCheckpoint(
        checkpoint,
        completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus));
    }
    completedCheckpoints.clear();
    String path = "/" + client.getNamespace();
    LOG.info("Removing {} from ZooKeeper", path);
    ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
  } else {
    LOG.info("Suspending");
    // Clear the local handles, but don't remove any state
    completedCheckpoints.clear();
    // Release the state handle locks in ZooKeeper such that they can be deleted
    checkpointsInZooKeeper.releaseAll();
  }
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

@Override
public void shutdown(JobStatus jobStatus) throws Exception {
  if (jobStatus.isGloballyTerminalState()) {
    LOG.info("Shutting down");
    for (CompletedCheckpoint checkpoint : completedCheckpoints) {
      try {
        removeShutdown(checkpoint, jobStatus);
      } catch (Exception e) {
        LOG.error("Failed to discard checkpoint.", e);
      }
    }
    completedCheckpoints.clear();
    String path = "/" + client.getNamespace();
    LOG.info("Removing {} from ZooKeeper", path);
    ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
  } else {
    LOG.info("Suspending");
    // Clear the local handles, but don't remove any state
    completedCheckpoints.clear();
    // Release the state handle locks in ZooKeeper such that they can be deleted
    checkpointsInZooKeeper.releaseAll();
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime

/**
 * Synchronously writes the new checkpoints to ZooKeeper and asynchronously removes older ones.
 *
 * @param checkpoint Completed checkpoint to add.
 */
@Override
public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception {
  checkNotNull(checkpoint, "Checkpoint");
  final String path = checkpointIdToPath(checkpoint.getCheckpointID());
  // Now add the new one. If it fails, we don't want to loose existing data.
  checkpointsInZooKeeper.addAndLock(path, checkpoint);
  completedCheckpoints.addLast(checkpoint);
  // Everything worked, let's remove a previous checkpoint if necessary.
  while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
    final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
    tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume);
  }
  LOG.debug("Added {} to {}.", checkpoint, path);
}

代码示例来源:origin: org.apache.flink/flink-runtime

return new ZooKeeperCompletedCheckpointStore(
  maxNumberOfCheckpointsToRetain,
  client,

代码示例来源:origin: org.apache.flink/flink-runtime

completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) {
  retrievedCheckpoints.add(completedCheckpoint);

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback) {
  try {
    if (tryRemove(completedCheckpoint.getCheckpointID())) {
      executor.execute(() -> {
        try {
          discardCallback.accept(completedCheckpoint);
        } catch (Exception e) {
          LOG.warn("Could not discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
        }
      });
    }
  } catch (Exception e) {
    LOG.warn("Failed to subsume the old checkpoint", e);
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime

@Override
public void shutdown(JobStatus jobStatus) throws Exception {
  if (jobStatus.isGloballyTerminalState()) {
    LOG.info("Shutting down");
    for (CompletedCheckpoint checkpoint : completedCheckpoints) {
      tryRemoveCompletedCheckpoint(
        checkpoint,
        completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus));
    }
    completedCheckpoints.clear();
    String path = "/" + client.getNamespace();
    LOG.info("Removing {} from ZooKeeper", path);
    ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
  } else {
    LOG.info("Suspending");
    // Clear the local handles, but don't remove any state
    completedCheckpoints.clear();
    // Release the state handle locks in ZooKeeper such that they can be deleted
    checkpointsInZooKeeper.releaseAll();
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

@Override
public void shutdown(JobStatus jobStatus) throws Exception {
  if (jobStatus.isGloballyTerminalState()) {
    LOG.info("Shutting down");
    for (CompletedCheckpoint checkpoint : completedCheckpoints) {
      try {
        removeShutdown(checkpoint, jobStatus);
      } catch (Exception e) {
        LOG.error("Failed to discard checkpoint.", e);
      }
    }
    completedCheckpoints.clear();
    String path = "/" + client.getNamespace();
    LOG.info("Removing {} from ZooKeeper", path);
    ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
  } else {
    LOG.info("Suspending");
    // Clear the local handles, but don't remove any state
    completedCheckpoints.clear();
    // Release the state handle locks in ZooKeeper such that they can be deleted
    checkpointsInZooKeeper.releaseAll();
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

/**
 * Tries to remove the checkpoint identified by the given checkpoint id.
 *
 * @param checkpointId identifying the checkpoint to remove
 * @return true if the checkpoint could be removed
 */
private boolean tryRemove(long checkpointId) throws Exception {
  return checkpointsInZooKeeper.releaseAndTryRemove(checkpointIdToPath(checkpointId));
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

/**
 * Synchronously writes the new checkpoints to ZooKeeper and asynchronously removes older ones.
 *
 * @param checkpoint Completed checkpoint to add.
 */
@Override
public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception {
  checkNotNull(checkpoint, "Checkpoint");
  final String path = checkpointIdToPath(checkpoint.getCheckpointID());
  // Now add the new one. If it fails, we don't want to loose existing data.
  checkpointsInZooKeeper.addAndLock(path, checkpoint);
  completedCheckpoints.addLast(checkpoint);
  // Everything worked, let's remove a previous checkpoint if necessary.
  while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
    try {
      removeSubsumed(completedCheckpoints.removeFirst());
    } catch (Exception e) {
      LOG.warn("Failed to subsume the old checkpoint", e);
    }
  }
  LOG.debug("Added {} to {}.", checkpoint, path);
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

private static CompletedCheckpoint retrieveCompletedCheckpoint(Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandlePath) throws FlinkException {
    long checkpointId = pathToCheckpointId(stateHandlePath.f1);

    LOG.info("Trying to retrieve checkpoint {}.", checkpointId);

    try {
      return stateHandlePath.f0.retrieveState();
    } catch (ClassNotFoundException cnfe) {
      throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state handle under " +
        stateHandlePath.f1 + ". This indicates that you are trying to recover from state written by an " +
        "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe);
    } catch (IOException ioe) {
      throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state handle under " +
        stateHandlePath.f1 + ". This indicates that the retrieved state handle is broken. Try cleaning the " +
        "state handle store.", ioe);
    }
  }
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

return new ZooKeeperCompletedCheckpointStore(
  maxNumberOfCheckpointsToRetain,
  client,

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) {
  retrievedCheckpoints.add(completedCheckpoint);

相关文章