pyspark:在不溢出内存的情况下将Dataframe保存到hadoop或hdfs?

euoag5mw  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(608)

我正在开发一个管道,它读取许多配置单元表,并将它们解析为一些densevector,以便最终在sparkml中使用。我想做很多迭代来找到最佳的训练参数,包括模型的输入和计算资源。我正在使用的Dataframe大约在50-100gb之间,分布在一个Yarn集群上的动态执行器数量上。
每当我尝试保存时,无论是保存到parquet还是saveastable,我都会遇到一系列失败的任务,直到最后它完全失败,并建议将spark.yarn.executor.memoryoverhead提升。每个 id 是单行,不超过几kb。

feature_df.write.parquet('hdfs:///user/myuser/featuredf.parquet',mode='overwrite',partitionBy='id')

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 98 in stage 33.0 failed 4 times, most recent failure: Lost task 98.3 in 
stage 33.0 (TID 2141, rs172.hadoop.pvt, executor 441): ExecutorLostFailure 
(executor 441 exited caused by one of the running tasks) Reason: Container 
killed by YARN for exceeding memory limits. 12.0 GB of 12 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.

我现在有2g的。
spark workers当前获得10gb,驱动程序(不在群集上)获得16gb,maxresultsize为5gb。
我在写之前缓存Dataframe,我还能做什么来排除故障?
编辑:它似乎试图一次完成我所有的转换。当我查看saveastable()方法的详细信息时:

== Physical Plan ==
InMemoryTableScan [id#0L, label#90, features#119]
   +- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Filter (isnotnull(id#0L) && (id#0L < 21326835))
            +- InMemoryTableScan [id#0L, label#90, features#119], [isnotnull(id#0L), (id#0L < 21326835)]
                  +- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *Project [id#0L, label#90, pythonUDF0#135 AS features#119]
                           +- BatchEvalPython [<lambda>(collect_list_is#108, 56845.0)], [id#0L, label#90, collect_list_is#108, pythonUDF0#135]
                              +- SortAggregate(key=[id#0L, label#90], functions=[collect_list(indexedSegs#39, 0, 0)], output=[id#0L, label#90, collect_list_is#108])
                                 +- *Sort [id#0L ASC NULLS FIRST, label#90 ASC NULLS FIRST], false, 0
                                    +- Exchange hashpartitioning(id#0L, label#90, 200)
                                       +- *Project [id#0L, UDF(segment#2) AS indexedSegs#39, cast(label#1 as double) AS label#90]
                                          +- *BroadcastHashJoin [segment#2], [entry#12], LeftOuter, BuildRight
                                             :- HiveTableScan [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, reka_data_long_all_files
                                             +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
                                                +- *Project [cast(entry#7 as string) AS entry#12]
                                                   +- HiveTableScan [entry#7], MetastoreRelation reka_trop50, public_crafted_audiences_sized
c0vxltue

c0vxltue1#

我的建议是禁用动态分配。尝试使用以下配置运行它:

--master yarn-client --driver-memory 15g --executor-memory 15g --executor-cores 10 --num-executors 15 -Dspark.yarn.executor.memoryOverhead=20000 -Dspark.yarn.driver.memoryOverhead=20000 -Dspark.default.parallelism=500
ttisahbt

ttisahbt2#

最后,我从spark用户邮件列表中得到的线索是查看分区,包括平衡和大小。正如规划者所说,给单个执行者的任务太多了。添加 .repartition(1000) 对于表达式来说,创建要编写的Dataframe是完全不同的,通过在一个聪明的键列上创建和分区,可能可以获得更多的收益。

相关问题