如何有效地从s3 bucket过滤Dataframe

qqrboqgw  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(380)

我想从按年/月/日/小时划分的s3存储桶中提取指定的天数。这个bucket每天都会添加新文件,而且会变得相当大。我想做什么 spark.read.parquet(<path>).filter(<condition>) 但是,当我运行它时,它比指定路径花费的时间(0.5小时)要长得多(1.5小时)。我不明白为什么要花更长的时间,我应该添加一个 .partitionBy() 从桶里读东西的时候?还是因为bucket中有大量的数据需要过滤?

6mw9ycah

6mw9ycah1#

您面临的问题是关于分区发现的。如果您指向Parquet文件所在的路径 spark.read.parquet("s3://my_bucket/my_folder") spark将在任务管理器中触发一个名为

Listing leaf files and directories for <number> paths

这是一种分区发现方法。为什么会这样?当您使用路径进行调用时,spark无法找到分区的位置以及有多少分区。
在我的情况下,如果我这样计算:

spark.read.parquet("s3://my_bucket/my_folder/").filter('date === "2020-10-10").count()

它将触发列表,大约1700个文件夹需要19秒。加上7秒,总共有26秒。
要解决这个开销时间,您应该使用元存储。aws用aws胶水提供了一个很好的解决方案,就像hadoop环境中的hivemetastore一样使用。
使用glue可以存储表元数据和所有分区。而不是给出Parquet地板路径,而是指向table,就像这样:

spark.table("my_db.my_table").filter('date === "2020-10-10").count()

对于相同的数据,使用相同的过滤器。列表文件不存在,整个计数过程只用了9秒。
在你的情况下,你按年、月、日、时划分。我们每年讨论8760个文件夹。
我建议你看看这个链接和这个链接
这将展示如何使用胶水作为您的Hive元存储。这对提高分区查询的速度有很大的帮助。

相关问题