转载:美团 Flink 大作业部署与状态稳定性优化实践 我觉得这篇文章有价值。
美团 Flink 的应用场景覆盖了社区定义的三种场景:
随着业务的发展和实时计算的迭代,业务对实时计算的应用越来越广泛,也越来越深入。当前我们有将近 5 万个作业部署在超过 15,000 台机器上,高峰期处理的流量达到了 5.4 亿条/秒,这几个指标相比往年都有了很大的增长。除了整体的规模增长,今年我们也遇到了单作业规模大幅增长的情况,目前我们的大作业并发度达到 5000,状态达到了 10 TB。单作业规模的增长也给我们带来了新的问题和挑战。
首先是作业启动的问题。以往在小规模作业上不那么明显的问题,在大作业部署启动流程中暴露了出来,比如 Task 启动慢部署慢、分布不均,大作业对 HDFS 的影响等。另外在状态方面,状态很大的时候同样给作业带来了不可忽略的影响,主要是 Savepoint 的制作开销和恢复效率,以及状态的容灾等问题。
我们当前的大作业算子并发达到了 5000,拓扑复杂度上算子数量达到了 8000,有两层的数据 shuffle 交换,资源上作业需要超过 1000 个 TaskManager,在这样的规模下,我们遇到了一些之前没有遇到过的问题。
首先,部署大量 Task 的时候会遇到部署时间长或因为 RPC 超时而部署失败的问题;
此外,Task 分布不够合理,部分 TaskManager 中 Network Buffer 的数量不足,会导致作业启动失败;
另外,大作业做 Checkpoint 期间,会给 HDFS 带来瞬时压力,也会影响其他作业使用 HDFS。
针对上述问题我们进行了以下优化:
我们首先分析了 JobManager 视角的作业部署流程,希望能搞清楚各个环节有哪些因素影响部署,以及他们是如何影响部署的,然后对症下药去解决问题。
可以看到,从收到 JobGraph 到启动所有 Task,主要环节有构建执行图、申请资源、部署 Task、启动 Task 这几个步骤。
当我们重点关注部署和启动 Task 步骤的时候,我们发现了以下几个现象:首先 JobManager 所在的机器的网卡在部署期间会被打满;从 TaskManager 日志中也可以看到,下载 userjar 操作花的时间比较长;经过对比测试,我们发现 userjar 缩小体积之后就没有 RPC 请求超时的现象了,部署耗时也有所减少。所以我们研究了 userjar 的下载流程,每个 Task 线程在启动的时候需要从用户的 jar 包中加载用户代码的类加载器,这一步需要从 JobManager 的 BlobServer 中下载 userjar。
Flink 当前的实现中,每个 TaskManager 都需要下载一次 userjar,TaskManager 的多个 Task 会解析同一个 jar 包。当作业开启 ha 的时候,TaskManager 会从 HDFS 中读取 userjar,从而减轻 JobManager 的分发压力。当 userjar 比较大,数量又比较多,且没有开启 ha 的情况下,JobManager 分发 userjar 的压力就会很大,会导致阻塞网络甚至打满网卡。
明确了原因,我们就可以对 userjar 的分发进行优化。我们当前部署是 yarn session 模式,为了提交作业时能复用 session cluster 达到快速部署的目的,也为了同一个 session cluster 中提交多个 job 的时候都能有优化效果,我们没有利用 yarn 的机制来分发 userjar,而是在 Flink 层面做了优化。
我们让同一个节点上的 TaskManager 只下载一次 userjar,该节点上所有 TaskManager 共享这次下载结果,因为 userjar 的下载次数从 TaskManager 粒度下降到了机器粒度,降低了一个数量级,大幅减小了 JobManager 的分发压力。
优化了 userjar 的分发问题,我们还发现,在有 shuffle 而且并发度比较大的作业上,部署过程中仍然有存在一些 RPC 超时的情况,而且 JobManager 上有大量的 requestPartitionState 请求。这是因为下游 Task 启动的时候会检查上游 Task 的 partition 是否就绪,如果还没有就绪,下游会请求 JobMaster 去询问上游 Task 的状态,判断是否需要继续请求上游 Task 的 partition。当作业规模很大的时候,很容易出现不同 Task 启动速度不一致的情况,导致 JobMaster 上出现大量的 requestPartitionState 请求。
对于这个问题我们做了一个简单的优化。下游 Task 请求 partition 失败的时候,先自己尝试重试几次,而不是立即请求 JobMaster。通过这个调整,大幅减少了 JobMaster 上 requestPartitionState 的 RPC 请求量,使得 JobMaster 可以有更多时间去处理其他的 RPC 请求。
经过以上两步优化,JobManager 的分发压力大幅减小。上图是分发 userjar 的优化效果图,可以看到作业规模越大,优化效果越明显。此外在当前规模下,我们也消除了 RPC 超时的异常,使得大作业可以成功部署。
我们再来看下 Task 分布不均的问题。
我们发现,在大作业部署过程中,Task 没有均匀分布在所有 TaskManager 中,这会导致部分 TaskManager 出现 Network Buffer 不足的情况,导致作业启动失败。我们虽然能够通过增加总内存、调整不同内存的占比来临时解决这个问题,但是这种方法并不能解决本质的问题,反而会加剧资源的浪费。因为不同 TaskManager 中的 Task 数量和类型不同,实际所需要的资源量也不同的,但是 TaskManager 都是统一按照最大的资源量来申请的,这会导致很多 TaskManager 申请了比实际需要更多的资源。另一方面 Task 比较集中的 TaskManager 计算压力更大,也更容易成为整个作业的计算瓶颈。
我们总结了两类 Task 分布不均的问题:
Task 的数量分布不均,主要是因为 SlotSharing 机制允许不同算子的不同 Task 使用同一个 Slot,但是在 Task 选择 Slot 的时候,并没有考虑 Slot 里 Task 数量的分布情况,导致多个 Task 集中在一个 Slot 里,进而导致 Task 集中在同一个 TaskManager 中。出现这种情况通常是业务有一些多 source 多 sink 的作业来做一些流量分发、汇聚的操作。
针对数量分布不均的问题,我们对 Task 选择 Slot 的策略做了一些优化,新的选择策略如下:对于无上游的 Task,尽量把它分配到新的 Slot 上,直到 Slot 数达到上限;对于有上游的 Task,优先选择把它放在与上游 Task 相同的 Slot 里,减少不必要的数据分发;有多个可选 Slot 的时候,优先选择 Task 数量少的 Slot。经过这样的优化,Task 在 Slot 间的分布就比较均匀了。上图可以看到之前集中在 1 个 Slot 里面的 4 个 source 算子,现在已经比较均匀地分布在不同的 Slot 里面了。
再来看下 Task 类型分布不均的问题。我们知道,一个 Slot 内不可能有多个相同类型的 Task,只有包含相同类型 Task 的多个 Slot 集中在某个 TaskManager 中,才会导致 Task 类型集中,而 Slot 选择 TaskManager 受申请顺序的影响。当前,Slot 申请顺序是随机的,并没有考虑 Task 类型的分布情况,这就会导致相同类型的 Task 集中在同一个 TaskManager 中。这个问题也比较普遍,只要作业不同算子的并发度不一致,就有可能出现这个问题。
针对类型分布不均的情况,我们对当前 Slot 的申请顺序做了优化。按照 Slot 中 Task 类型组合的情况,对 Slot 申请顺序进行调整,包含相同 Task 类型组合的 Slot,尽量分布在不同的 TaskManager 中,比如包含 Source、Process、Sink 的这三个算子的 Slot,被我们均匀的分散到了不同的 TaskManager 中。
上述两个优化是独立的,组合使用可以得到更好的优化效果,如上图。
以我们线上的大作业来验证,可以看到在 Task 的数量分布方面,优化后每个 TaskManager 统一都有 6 个 Task,分布很均匀;资源和负载方面,因为 Task 数量和类型的均衡,TaskManager 中 Network Buffer、数量和 CPU 使用率,从之前的有高有低变得比较一致了
最后我们再来看下 HDFS 的压力问题。导致 HDFS 压力的原因有两个:
可以看出,HDFS 的压力主要来自于 NameNode。
我们应对 NameNode 压力的方法也很直接。首先在底层部署了多组 HDFS NameNode,这样可以在底层资源上做到水平扩展。在引擎层,我们提供了多组任务的均衡策略,来决定作业使用哪一组 NameNode。之后,再通过动态指定相关的路径,使作业真正使用不同的 NameNode。最终使得 HDFS 的服务能力可水平扩展,大作业的部署运行也不再影响其他作业。
除了上面的优化,我们还做了一些其他方面的优化项来帮助大作业更好地部署运行。我们向用户开放了 Flink 的运行参数,用户可以针对自己作业的运行特点做个性化的调优、我们限制了 Checkpoint 最小制作间隔来避免不合理的高频 Checkpoint 制作影响集群上的其他作业。
在美团,我们的 Flink 计算资源会有多机房交互,同一个项目组可能在不同的机房都有计算资源,因此作业有换机房启动的场景。基于过往的经验,我们更倾向于使用 Retained Checkpoint 而不是 Savepoint 去重启作业。因为我们主要使用 RocksDBStateBackend 的增量 Checkpoint,相对于 Savepoint,它的制作和恢复效率会更高,间隔配置更短,从状态恢复时需要回溯的数据也更少。此外,一些业务的重要作业要求具备更高的状态容灾能力,整个机房故障时也要能够切换到其他机房运行。最后,虽然我们在 2020 年支持的 Savepoint 跨机房副本能够解决部分上面提到的问题,但由于业务越来越倾向于使用 Retained Checkpoint 来恢复作业,这个功能也就无法再满足需求。
从上述背景中我们可以提炼出两个目标:
通过分析,我们需要分两步来完成上述目标。
Checkpoint self contained & relocatable 特性主要是为了让 Checkpoint 能够被移动和复制。其实 Flink 在 1.11 版本已经在 Savepoint 上支持了这个特性,但 Checkpoint 的情况比较复杂所以还没有支持。为了讲清楚这个问题,需要先了解 Checkpoint 目录结构。
首先是 checkpoints/{job-id} 目录,也称为 exclusive 目录,每个 Checkpoint 的 id 会对应一个 exclusive 目录,用于存放每个 checkpoint 的独有的文件。其次是 shared 目录,用于存放各个 Checkpoint 之间会共享的文件。最后是 taskowned 的目录,用于存放永远不能由 JM 删除的文件。
每个 Checkpoint 都拥有一个 metadata 文件,里面保存了 Checkpoint 元数据。
另外就是独有的状态文件和 Checkpoint 间共享的文件,如上图橘红色的线条表示,Checkpoint 文件中包含了对 exclusive 文件和 shared 文件的引用,通过 metadata 文件,就能找到一个 Checkpoint 所需要的所有文件。
至此,就可以说明 Checkpoint 不是 self-contained 的含义了。如上图,在一些情况下一个 Checkpoint 的 metadata 文件引用了其他作业实例的 Checkpoint 的 shared 文件,这里不同的作业实例可能是同一个作业的代码多次部署,每个 Flink job 的 id 对应一个作业实例。
这种情况常发生在作业从增量的 Checkpoint 恢复时,如图 job1、job2、job3 是同一个作业代码多次启动,job3 启动时从 job2 留下的 Checkpoint 恢复,job2 启动时又是从 job1 留下的 Checkpoint 恢复,结果就是形成一个长长的引用链。实际生产应用中这种情况非常常见,我们经常会在调整作业参数、修改代码等操作之后,再从 retained Checkpoint 重启作业。
上述情况会带来两个问题:
首先需要说明的是,上述讨论虽然没有局限于某一个具体的 StateBackend,但实际上这种问题主要是 RocksDBStateBackend 的增量 Checkpoint 导致的,这是我们在生产环境中默认使用的 Backend 和 Checkpoint 方式。所以我们先来看看 RocksDBStateBackend 的增量 Checkpoint。
RocksDB 是一个基于 LSM Tree 的 KV 存储引擎,它会将持久化数据写到磁盘文件中。上图是一个 RocksDB 实例的文件目录结构,可以分为两类:
开始制作 Checkpoint 时,RocksDBStateBackend 会先将数据刷盘,然后将 DB 实例中所有文件上传到指定的 Checkpoint storage 中,在我们的场景里就是上传到 HDFS。如左图,假如在制作 Checkpoint3 时,DB 中有 123 这三个 SST 文件,这些 SST 文件由于不会被修改,就有可能会被后续的增量 Checkpoint 直接使用,因而会被放到 shared 目录下。而所有的 meta 文件都会被放到 exclusive 目录下。
如果过了一段时间,我们开始基于 Checkpoint3 去制作后面的 Checkpoint5。这里会存在一个疑问,Checkpoint3 之后为什么是 Checkpoint5,而不是 Checkpoint4?这是因为这里可能会插入一个 Savepoint,而 Savepoint 要占用 Checkpoint 序号。
制作 Checkpoint5 时,DB 实例下的文件情况如上图右边所示,新增了 04.sst 减少 01.sst。由于是增量 Checkpoint,这时候只需要将 metadata 文件和 04.sst 进行上传,而 02.sst 和 03.sst 只需要在 metadata 中记录文件引用,不需要重复上传。
那么制作 Checkpoint5 的时候是如何知道 02.sst 和 03.sst 已经上传过了?其实是通过一个 previous-sst-list 来记录的,里面记录了上次成功的 Checkpoint 中所有 sst 文件信息,这样就可以不断基于 previous-sst-list 来进行增量的 Checkpoint 制作。
当作业基于增量 Checkpoint 恢复时,如上图所示首先会根据 Checkpoint 信息去恢复 previous-sst-list,然后去构造 RocksDB 实例,就是将 meta 文件和 sst 文件下载到对应的位置。这样即使是启动后的第一个 Checkpoint,也可以基于 restored Checkpoint 进行增量制作。
但这也就意味着新启动作业的 Checkpoint 可能会引用它所 restore 的 Checkpoint 中的文件, 这正是前面提到的 Checkpoint 跨作业实例文件引用的根本原因。
清楚了原理以后,改动方式也就变得清晰了,只需要在恢复 previous-sst-list 之前,判断 restore Checkpoint 所属的作业是不是当前作业,如果是,就恢复 previous-sst-list;如果不是,就说明是新作业从 retained Checkpoint 启动,不恢复 previous-sst-list。不恢复的话,作业启动之后的第一个 Checkpoint 就会上传所有文件,再之后的 Checkpoint 才会基于前面的 Checkpoint 进行增量制作,这也就不会存在跨作业文件引用的问题了。
新的问题是如何知道 restore 的 Checkpoint 所属的作业 ID 呢?上图描述了 Checkpoint metadata 结构,从中我们无法获取到作业 ID,那就要想办法把作业 ID 放进去。关键就在于红框中的 keyed state handle,它有多种不同的实现,每种不同的实现代表一种 Checkpoint 或者 Savepoint 的方式。
如果是 IncrementalRemoteKeyedStateHandle,说明这是一个 RocksDBStatebackend 的增量 Checkpoint,所以只要给 IncrementalRemoteKeyedStatehandle 增加一个 jobID 字段,在制作 Checkpoint 时把 ID 字段也序列化到 meta 文件中,这样在 restore 的时候就可以知道 Checkpoint 所属的 job ID 了。
再来看一下 relocatable 的问题, 如上图,metdata 中会记录 exclusive 文件和 shared 文件的引用,其实就是记录了一个文件的绝对路径。而当 Checkpoint 被整个复制到其他目录时,这些引用就失效了。解决的方法也很简单,就是将绝对路径换成相对路径,这样就能根据 Checkpoint 的 exclusive 目录和文件的相对路径计算出文件的具体位置,这样 Checkpoint 就不怕被移来移去了。到这里,Checkpoint relocatable 的问题就解决了。
有了 self-contaiend & relocatable,我们的 Checkpoint 就支持把副本制作到任何地方了,这样我们也才能开始 Checkpoint 跨机房副本制作能力的支持。
最初在评估如何完成副本跨机房制作能力的时候,有几个备选方案:
第一就是像支持 Savepoint 副本制作一样,通过 distcp 对整个 Checkpoint 目录进行跨机房复制,这种方式在复制 Savepoint 时工作良好。但由于 distcp 的每个复制任务都会启动一个很重的 mapreduce 作业,而 Checkpoint 又比 Savepoint 频繁得多,而且 distcp 过程中作业还在运行,可能会不断有文件在复制过程中被删除,虽然可以配置为忽略,但也会导致一些其他问题,因此不太合适;
第二就是编写一个 Checkpoint Replicate Service,连接多个 HDFS 集群,专门用于 Checkpoint 的副本制作,这也是我们最后选择的方式;
第三是通过改造 Flink 引擎,在制作 Checkpoint 时直接将数据双写到两个 HDFS 集群上,但是这种方式无疑会给引擎增加不稳定的因素,不能为了应对小概率的机房故障而放弃作业运行的稳定性和效率;
最后就是改造 Flink 的 Checkpoint coordinator,使其在制作 Checkpoint 完成后触发一次 distcp,在 distcp 完成前不触发后续的 Checkpoint 制作。这种方式可以避免方案 1 中提到的 distcp 复制过程中文件变动的问题,但也是由于 distcp 效率原因而被放弃。
Checkpoint Replicate Service 的实现方式如下:每一个节点会持有多个 HDFS client,上图中以橘色和紫色来区分两个 HDFS 集群和 HDFS client,分别是运行的集群和副本要制作的集群。在进行副本制作时,通过原集群的 HDFS client 读取文件,传给目标集群的 HDFS client,将文件写入目标集群。
如图我们要对 Checkpoint5 进行副本制作时,首先读取 Checkpoint5 的 metadata 文件,解析出引用的所有文件得到 referencedFiles,再加上 metadata 文件,就是我们要复制到目标集群的所有文件。通过 replicate service 将这些文件复制到目标集群的对应位置上,再加上前面介绍的 Checkpoint self-contained&relocatable 特性,我们就在目标集群上得到了一个可用的 Checkpoint 副本。
这里有一个疑问,运行过程中 Checkpoint 不断完成,后续的副本制作是否也能够像 Checkpoint 一样进行增量制作,答案是肯定的。
如图我们假设制作 Checkpoint5 的副本时,目标集群对应位置上已经存在了 Checkpoint3,这时我们就可以根据 Checkpoint3 的副本来进行增量副本制作。先读取 metadata 文件,解析出引用文件列表 referenceFiles3,然后对这两个文件列表进行集合运算,就知道如何进行增量的副本制作了。
第一部分,只存在于 Checkpoint5 中的文件是新增的文件,需要复制到目标集群中去;
第二部分,只存在于 Checkpoint3 中的文件是在新 Checkpoint 过程中被删除的,由于副本集群只需要保留最新的 Checkpoint3,这部分文件会被直接删除;
最后是相交的部分,这些文件虽然被 Checkpoint5 所需要,但已经被上传过了,因此可以忽略。通过这种方式,我们就能像 Checkpoint 增量制作一样去进行增量的副本制作。
我们在实际工程实践上也获得了不少的经验:
第一点是需要改造 Flink 引擎的 metadata 解析过程。当前的实现会在解析过程中去访问 metadata 文件所在的 HDFS,由于使用的不是我们指定的 HDFS client,可能就会因 metadata 文件所在集群不是副本服务默认连接的集群而导致解析失败。但其实这个访问不是必须的,因此我们在解析服务中将这个访问直接移除。
第二点是考虑缓存 metadata 的解析结果。生产上的大状态作业,一个 metadata 可能有几十 M (甚至几个 G),引用文件会达到几十万个,解析时间可能需要分钟级别,而增量制作副本时会有多次解析同一个 metadata,因此可以考虑把解析结果缓存起来。
第三点是引用文件的复制和删除可以拆分成多个批次发送到多个节点上并行执行。这是因为大状态的作业一个 Checkpoint 复制的文件量可能就达到了 10TB+,很容易达到一台机器的网络瓶颈。
最后还有两个小建议:
首先是运行中的作业副本制作失败时不需要进行重试,主要是考虑到运行中的作业会不断有更新的 Checkpoint 产生,新 Checkpoint 复制成功的意义要大于旧 Checkpoint 的复制;
此外,files-to-delete 的执行可以异步进行,即使失败了也只是多一些无用的文件残留,不影响副本的可用;只要保证最终有兜底策略进行清理就行。
状态稳定性方面,我们还进行了另外三个方面的优化:
第一,修复了一个 RocksDBStateBackend 的内存泄露问题,这个问题触发的条件是作业发生了 restart,并且 restart 之后会复用没有退出的 TM。同时 TM 的 heap 内存又很充足,full gc 很不频繁。上图可以看到我们定位的一个 TM 的内存变化,图中两次 restart 之后,该 TM 的内存都增长了 4G 左右,后续如果再发生 restart,就会导致 TM 内存超用。
导致这个问题得原因是 RocksDBStateBackend 清理过程中存在 bug,有一处 RocksObject 没有被清理,进而导致 restart 前 RocksDB 实例的 native 内存释放不了。
第二,Savepoint 之后的第一个增量 Checkpoint 会退化成全量 Checkpoint,会上传所有 RocksDB 文件。上图可以看到红框中是一个 Savepoint,而黄框中是紧跟其后的一个 Checkpoint,这个 Checkpoint 上传了将近 800G 文件,明显大于之后正常的 Checkpoint。
导致这个问题的原因是 Savepoint 制作完成后错误清理了 previous-sst-list,我们已将修复提交给社区,需要的同学可以升级到对应的版本。
最后,我们支持了在触发 Checkpoint 时指定单独的超时时间,做这个优化是因为大状态作业的 Savepoint 的制作时间一般会远超增量 Checkpoint。上图可以看到,Savepoint 制作花费将近 7 分钟,增量的 Checkpoint 只需要一两秒,但是 Savepoint 却直接采用 Checkpoint 的超时间配置,导致我们需要给 Checkpoint 配置一个能够覆盖 Savepoint 的超时时间,这很不利于及早地暴露作业问题。
未来,我们会在以下三个方面继续做改进和稳定性建设:
稳定性方面,我们会继续优化作业的断流时间,提升作业稳定性,探索 k8s 来获取更好的资源隔离和资源扩缩容能力。
运行性能方面,我们会对状态后端做优化来支持大状态作业更好地运行,并对反压做优化让作业在高峰和恢复期运行得更好。
最后在资源效率方面,我们会对作业的资源利用率进行评估和优化,来节省资源和人力成本。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_21383435/article/details/126368691
内容来源于网络,如有侵权,请联系作者删除!