pyspark数据从非分区配置单元表加载到分区配置单元表的性能调优

hjzp0vay  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(370)

我们需要通过pyspark将数据从非分区的外部配置单元表work\u db.customer\u tbl摄取到分区的外部配置单元表final\u db.customer\u tbl,之前是通过配置单元查询完成的。最后一个表按load\ U date列进行分区(load\ U date列的格式为yyyy mm dd)。
因此,我们有一个简单的pyspark脚本,它使用insert查询(与前面使用的配置单元查询相同)来使用spark.sql()命令摄取数据。但是我们有一些严重的性能问题,因为在摄取之后我们尝试摄取的表有大约3000个分区,每个分区有大约4MB的数据,除了最后一个大约4gb的分区。表的总大小接近15gb。而且,在摄取之后,每个分区有217个文件。最后一张table是一张时髦的压缩Parquet地板table。
源工作表有一个15 gb的文件,文件名格式为customers\u tbl\u unload.dat。
早些时候,当我们通过直线连接使用配置单元查询时,通常需要25-30分钟才能完成。现在,当我们尝试使用pyspark脚本时,大约需要3个小时才能完成。
我们如何调整Spark性能,使摄取时间小于直线所需的时间。

The configurations of the yarn queue we use is:
Used Resources: <memory:5117184, vCores:627>
Demand Resources:   <memory:5120000, vCores:1000>
AM Used Resources:  <memory:163072, vCores:45>
AM Max Resources:   <memory:2560000, vCores:500>
Num Active Applications:    45
Num Pending Applications:   45
Min Resources:  <memory:0, vCores:0>
Max Resources:  <memory:5120000, vCores:1000>
Reserved Resources: <memory:0, vCores:0>
Max Running Applications:   200
Steady Fair Share:  <memory:5120000, vCores:474>
Instantaneous Fair Share:   <memory:5120000, vCores:1000>
Preemptable:    true

The parameters passed to the PySpark script is:
num-executors=50
executor-cores=5
executor-memory=10GB

PySpark code used:
insert_stmt = """INSERT INTO final_db.customers_tbl PARTITION(load_date) 
SELECT col_1,col_2,...,load_date FROM work_db.customer_tbl"""
spark.sql(insert_stmt)

即使使用了Yarn队列中10%的资源,这项工作也要花费大量时间。我们如何调整工作使之更有效率。

gg58donl

gg58donl1#

您需要重新分析您的数据集,看看您是否使用了正确的方法,将yoir数据集按日期列进行分区,还是应该按年份进行分区?要理解为什么每个分区有200多个文件,您需要了解spark分区和hive分区之间的区别。您应该首先尝试的一种直接方法是将输入数据集作为Dataframe读取,并按您计划在配置单元中用作分区键的键对其进行分区,然后使用df.write.partitionby保存它
由于数据似乎在date列上过于倾斜,请尝试在其他列上对其进行分区,这些列可能具有相同的数据分布。否则,过滤掉歪斜的数据并分别进行处理

相关问题