以下是我的Flink检查点配置,我们将S3作为后端。我们正在运行这个flink作业(版本:1.17.0)在EMR集群中
checkpoint-interval : 70000
min-pause-between-checkpoint : 15000
max-concurrent-checkpoint : 1
checkpoint-type: unaligned
TolerableCheckpointFailureNumber :2
Checkpoint-mode : atleast-once
incremental-checkpointing: true
state.backend: filesystem
state.checkpoints.dir: s3p://checkpoint-sp-test/_entropy_/
s3.entropy.key: _entropy_
s3.entropy.length: 1
presto.s3.connect-timeout: 1m
presto.s3.socket-timeout: 1m
presto.s3.max-connections: 6500
state.checkpoints.num-retained: 3
high-availability.type: zookeeper
总作业并行度:5.5K
使用上述配置,理想情况下,flink检查点应该每70秒发生一次,但我看到每个检查点的触发延迟为5-20分钟[随附屏幕截图]。
我也能弄清楚是什么导致了这种延迟。检查点阻塞删除流程在每次检查点调用之前触发。这个检查点删除流程由“state.checkpoints.num-retained”设置控制,如果我设置state.checkpoints.num-retained= 6000000(任意大的数字),所有检查点每隔70秒发生一次。其中一个解决方案是在检查点桶中添加S3生命周期规则,以便在1天后清除所有检查点。
上面的方法可以工作,但是设置“state.checkpoints.num-retained”为非常大的数字会有副作用。我看到了新的工作经理选举的副作用。每当选择新的作业管理器时,它首先尝试获取在“state.checkpoints.num-retained”设置上配置的检查点数量。如果它设置为大,它只是在努力获取无限数量的旧检查点,作业永远无法启动。
任何建议,为解决这个问题将非常感谢!!
2条答案
按热度按时间mm5n2pyu1#
首先,Flink会定期删除旧的检查点以释放存储空间。此清理过程由state.checkpoints.num-retained设置控制。当你将其设置为非常大的数字时,Flink会保留大量的检查点,因此,清理过程可能需要很长时间,因为它需要检查并可能删除大量的旧检查点。当新的作业管理器被选中时,它需要恢复Flink作业的状态。这涉及从保留的检查点获取元数据和可能的一些数据以恢复状态。如果您将state.checkpoints.num-retained设置为一个非常大的数字,作业管理器在恢复过程中可能会遇到困难,因为它必须处理大量的检查点,这可能会非常耗时且占用大量资源。
我建议您使用S3生命周期规则设置一个合理的数量,以便在一段时间后自动清理旧的检查点。
5ssjco0h2#
在我看来,您选择了文件系统状态后端,它不支持增量检查点(尽管您设置了配置)。所以每个检查点都是完整的检查点。而且你有相当高的并行度(5500),与许多有状态操作符相结合。这意味着每个检查点将包含成千上万的文件,这不是S3可以快速处理的。
RocksDB状态后端在处理这么多状态方面做得更好,但速度较慢。