我有一个Parquet格式的大数据集(大小约1tb),分为2个层次: CLASS
以及 DATE
只有7节课。但从2020年1月1日开始,这个日期一直在增加。我的数据按 CLASS
先是然后 DATE
比如:
CLASS1---DATE 1
---DATE 2
--- .
--- .
--- .
---DATE N
CLASS2---DATE 1
---DATE 2
--- .
--- .
--- .
---DATE N
我加载我的数据 CLASS
在for循环中。如果我加载整个parquet文件,warn会终止作业,因为它会重载内存示例。但是自从我在我的模型中做百分位计算以来,我每天都在加载。这个方法大约需要23小时才能完成。
但是,如果我重新划分,使我只有 CLASS
分区,这项工作大约需要10个小时。子分区过多是否会降低spark执行器作业的速度?我将分区层次结构保持为 CLASS
-> DATE
只是因为我需要在 DATE
每一天。如果只有一个分区更有效,那么我就必须重新分区到 CLASS
每天加载新数据后分区。有人能解释一下为什么只有一个分区工作得更快吗?如果是这样的话,最好的方法是每天通过追加数据而不重新划分整个数据集来划分数据?
谢谢您
edit:我使用文件结构上的for循环来循环 CLASS
像这样划分:
fs = s3fs.S3FileSystem(anon=False)
inpath="s3://bucket/file.parquet/"
Dirs= fs.ls(inpath)
for paths in Dirs:
customPath='s3://' + uvapath + '/'
class=uvapath.split('=')[1]
df=spark.read.parquet(customPath)
outpath="s3://bucket/Output_" + class + ".parquet"
# Perform calculations
df.write.mode('overwrite').parquet(outpath)
上膛的 df
会有所有的约会日期 CLASS=1
. 然后,我将该文件作为单独的Parquet文件输出 CLASS
因此我有7个Parquet文件:
Output_1.parquet
Output_2.parquet
Output_3.parquet
Output_4.parquet
Output_5.parquet
Output_6.parquet
Output_7.parquet
然后我合并成一个单一的Parquet7Parquet不是一个问题,因为结果Parquet文件小得多。
1条答案
按热度按时间piah890a1#
我的分区数据有三列:年、月和id
我可以通过加载根路径来读取Dataframe。
然后,分区列会自动添加到Dataframe中。现在,您可以通过以下方式为分区列筛选数据
做点什么
df_filtered
那么spark将只使用分区数据!对于重复处理,可以使用
fair scheduler
Spark的Spark。使用以下代码将fair.xml文件添加到项目的src/main/resources中,并在创建spark会话后设置spark配置。
然后你就可以同时做你的工作了。你可能想并行作业取决于类,所以
代码将同时使用不同的类值。