flink增量检查点—数据在共享文件夹中保留多长时间

li9yvcax  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(784)

我们使用flink1.6.3并将checkpoint保留在ceph中,一次只保留一个checkpoint,使用incremental和rocksdb。
我们以3天的延迟运行windows,这意味着我们期望在3-4天后不会保留checkpoint share文件夹中的任何数据,但是我们看到的数据不止这些
例如
如果今天是7月4日有一些文件从2月4日
有时我们会看到一些检查点,我们假设(由于其索引号不同步)它属于一个被压碎的作业,并且检查点没有用于恢复该作业
我的问题是
为什么我们会从延迟配置中看到较旧的数据
我如何知道这些文件属于有效的检查点,而不是被压碎的作业的检查点?这样我们就可以删除这些文件了

vsnjm48y

vsnjm48y1#

经过调查并在云唐(apache flink用户邮件列表)的协助下
我创建了以下代码
metadatapath—检查点/保存点文件夹中的\u元数据文件的路径
这是在flink 1.6.3版上测试的

DataInputStream in = new DataInputStream(new FileInputStream(metadataPath));
        final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, CheckpointTool.class.getClassLoader());

        final Set<String> pathSharedFromMetadata = savepoint.getOperatorStates().stream()
                .flatMap(operatorState -> operatorState.getSubtaskStates().values().stream()
                        .flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream()
                                .flatMap(keyedStateHandle -> Stream.concat(((IncrementalKeyedStateHandle) keyedStateHandle).getSharedState().values().stream(),
                                        ((IncrementalKeyedStateHandle) keyedStateHandle).getPrivateState().values().stream())
                                        .map(streamStateHandle -> {
                                            String name = null;
                                            if (streamStateHandle instanceof FileStateHandle) {
                                                name = ((FileStateHandle) streamStateHandle).getFilePath().getName();
                                            } else {
                                                final String handleName = ((ByteStreamStateHandle) streamStateHandle).getHandleName();
                                                name = new File(handleName).getName();
                                            }
                                            return name.trim();

                                        })
                                )
                        )
                )
                .collect(Collectors.toSet());
        System.out.println("pathSharedFromMetadata:" + pathSharedFromMetadata);

相关问题