我有一个非常大的Dataframe大约2tb的大小。我可以用两列对它们进行分区: MODULE
以及 DATE
如果我用 MODULE
例如,每个模块可以有相同的日期 MODULE A
可能有约会 2020-07-01 , 2020-07-02
以及 MODULE B
可能有 2020-07-01 , 2020-07-05
我需要先用 MODULE
在我最终划分和存储它们之前做一些聚合和连接 DATE
. 我用pyspark来编码。
在完成聚合和按模块连接之后,我将它附加到一个Parquet文件,并将整个Parquet文件加载到一个Dataframe中,然后按日期对其进行分区。问题是spark作业由于内存问题而终止。我可以直接按日期分区吗 MODULE
分区?所以分区看起来像这样:输入格式: s3://path/MODULE=A --> s3://path/DATE=2020-07-01
两个模块 A
& B
存在于分区中 DATE=2020-07-01
?
这是我的原始代码,由于在群集中的时间过长和内存不足而失败:
inpath="s3://path/file/"
outpath="s3://path/file_tmp.parquet"
fs = s3fs.S3FileSystem(anon=False)
uvaDirs = fs.ls(inpath)
# Load Data by Module
for uvapath in uvaDirs:
customPath='s3://' + uvapath + '/'
df1=spark.read.parquet(customPath)
#Perform aggregations and joins
df1.write.mode('append').parquet(outpath)
# Load - partition by date
df2=spark.read.parquet("s3://path/file_tmp.parquet")
df2.write.mode('overwrite').partitionBy("DATE").parquet("s3://path/final.parquet")
它成功地创建了 file_tmp.parquet
但在按日期加载和分区时失败。任何帮助都将不胜感激!谢谢您
1条答案
按热度按时间zrfyljdw1#
像delta datasource一样,delta存储为parquet
参考:https://docs.delta.io/latest/best-practices.html#-delta压缩文件和python语言