我在kubernetes集群中使用带有一个作业管理器pod和两个任务管理器pod的flink集群。当我将流作业提交给作业管理器时,它运行作业并将输出接收到接收器。我还启用了检查点来从失败中恢复。现在,当我有意删除任务管理器pod中的一个以验证flink中的节点故障处理时,我看到一些原本应该到达接收器的记录没有收到。当pod被kubernetes自动重启时,它将继续处理记录,但不会从检查点恢复。我正在使用下面的命令提交作业
flink run -Dparallelism=2 -m localhost:<port> -c <flink job> -p=2 <flink job>.jar
我在工作环境中有以下内容:
env.enableCheckpointing(10000)
env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStateBackend(new FsStateBackend(Paths.get(<checkpoint path>).toUri, false))
当任务管理器pod重新启动时,我有以下日志。
2020-10-01 10:01:30,096 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading 2966c462794bf94523e9a53c1d9a2f13/p-421bdbdb924a09ddc017b854d52d9a9457109d43-7307729d1f0408de20cd29e352a2a655 from flinksessioncluster-sample-jobmanager/172.20.225.40:6124
但在检查点目录2966c462794bf94523e9a53c1d9a2f13中,我只有以下项目。
chk-299 shared taskowned
我在2966c462794bf94523e9a53c1d9a2f13目录中没有目录p-421bdbdb924a09ddc017b854d52d9a9457109d43-7307729d1f0408de20cd29e352a2a655
根据文档,任务应该自动从检查点位置恢复。
请告诉我哪里可能是这个问题。
更新
实际进行的试验-
以“t”秒间隔将记录连续插入flink作业。当任务管理器处理记录时,我杀死了一个任务管理器pod。在这个时候,我停止了在flink工作中插入记录。在job的输入端,我向它插入了1000条记录。当任务管理器再次出现时,我的Flume里有700条记录。
现在我开始一次插入一条记录,看到sink中的记录突然增加到940,然后开始增加1,即任务管理器崩溃后插入的记录开始下沉。但在任务管理器崩溃之前插入的最初1000条记录中,我丢失了60条记录
暂无答案!
目前还没有任何答案,快来回答吧!