💅在前面的spark优化学习中,我们学习了spark的语法、资源调度、sql语法优化和数据倾斜的技巧,今天我们来学习spark中的作业优化,也就是job优化。对往期内容感兴趣的同学可以参考👇:
🙈关于spark的作业优化,主要是对map端和reduce端作业这两个部分进行优化,优化的内容可能和处理数据倾斜时的内容有点重复,重复的部分就当作复习啦。
map-side 预聚合:就是在每个节点本地对相同的 key 进行一次聚合操作,类似于MapReduce 中的本地 combiner。map-side 预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的 key 都被聚合起来了。其他节点在拉取所有节点上的相同 key 时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘 IO 以及网络传输开销。( 使用sparksql时,会自动使用HashAggregte 实现本地预聚合+全局聚合)
操作RDD :建议使用 reduceByKey 或者 aggregateByKey 算子来替代掉 groupByKey 算子。因为 reduceByKey 和 aggregateByKey 算子都会使用用户自定义的函数对每个节点本地的相同 key 进行预聚合。而 groupByKey 算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。
读取的数据源有很多小文件,会造成查询性能的损耗,大量的数据分片信息以及对应产生的 Task 元信息也会给 Spark Driver 的内存造成压力,带来单点问题。
hive的处理方式:在hive中,我们在map端执行前合并小文件,使用的是combinerhiveinputformat方法,对小文件进行合并,而hiveinputformat没有对小文件合并的功能。
#设置方法如下:
set hive.input.format=org.apche.hadoop.hive.sq.io.CombinerHiveInputFormat
spark的处理方式:spark也可以通过设置参数,设置多个小文件读取到同一个分区中,达到小文件合并的目的。
#设置一个分区的最大字节数
spark.sql.files.maxPartitionBytes=128MB #默认 128m
#设置打开一个文件的开销
spark.files.openCostInBytes=4194304 #默认 4m
这里我们解释一下这两个参数的作用:
map端任务执行的流程图:
增加map端的缓冲区大小也能提高效率,缓冲区的参数主要有以下几个:
综上可知,我们只能通过设置输出流缓冲区的大小来提高效率:
spark.shuffle.file.buffer=64 #将输出流缓冲区大小从32k调整到64k
关于这个部分,主要是对cpu资源的分配,并发度、并行度等的设置,大家可以参考下面的文章:
当我们把几张表join之后的结果插入新表后,生成的文件数数等于 shuffle 并行度,默认就是 200 份文件插入到hdfs 上。
优化方式:
解决方法:
将倾斜键单独加入动态分区
//1.非倾斜键部分
INSERT overwrite table A partition ( aa )
SELECT *
FROM B where aa != 大 key
distribute by aa;
//2.倾斜键部分
INSERT overwrite table A partition ( aa )
SELECT *
FROM B where aa = 大 key
distribute by cast(rand() * 5 as int);
map端端缓冲区大小不可设置,只能是5m,但reduce端端buffer大小可以设置,shuffle reduce task 的 buffer 缓冲区大小决定了 reduce task 每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
设置方式:
spark.reducer.maxSizeInFlight=48
reduce task 拉取属于自己的数据时,如果因为网络异常等原因导
致失败会自动进行重试。对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最大次数(比如 60 次),以避免由于 JVM 的 full gc 或者网络不稳定等因素导致的数据拉取失败。reduce 端拉取数据重试次数可以通过
spark.shuffle.io.maxRetries
参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为 3
Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如 60s),以增加 shuffle 操作的稳定性。reduce 端拉取数据等待间隔可以通过
spark.shuffle.io.retryWait
参数进行设置,默认值为 5s。
当 ShuffleManager 为 SortShuffleManager 时,如果 shuffle read task 的数量小于这个阈值(默认是 200)且不需要 map 端进行合并操作,则 shuffle write 过程中不会进行排序操作,使用 BypassMergeSortShuffleWriter 去写数据,但是最后会将每个 task 产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
当你使用 SortShuffleManager 时,如果确实不需要排序操作,那么建议将这个参数调大一些,大于 shuffle read task 的数量。那么此时就会自动启用 bypass 机制,map-side 就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此 shuffle write 性能有待提高。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://liuxiaocong.blog.csdn.net/article/details/124006095
内容来源于网络,如有侵权,请联系作者删除!