我有一个数据集,它被分区并作为一组JSON文件写入s3。
分区结构为:uid -通常小于20个分区。Int,可能值:1-100,70000-70100,80000-80100 type - String 2 partitions date -作为通常的日期。通常在2个月的范围内。hour -明显的24个分区。
一个分区内有一个JSON文件。一般数据量~ 30 GB
当我通过根文件夹,因为它花了**~3分30秒**打印文件的架构。
val df = spark.read.json("s3://path/to/root/folder")
df.printSchema()
当我通过根文件夹,但执行初始过滤,它也花了**~3分30秒**。
val df = spark.read.json("s3://path/to/root/folder")
.filter(col("uid") === 1 || col("uid") === 80004)
df.printSchema()
我通过通配所有分区将加载时间减少到2分31秒
val df = spark.read.json("s3://path/to/root/folder/uid=*/type=*/date=*/hour=*")
df.printSchema()
我试着把所有需要的uid都括起来,它起作用了。我把加载时间减少到了**~ 34秒**
val df = spark.read
.option("basePath", "s3://path/to/root/folder")
.json("s3://path/to/root/folder/uid={1,80004}")
df.printSchema()
我还尝试在数组中添加所需的文件夹,并获得相同的时间。
val df = spark.read
.option("basePath", "s3://path/to/root/folder/")
.json("s3://path/to/root/folder/uid=1/type=OW/date=2019-07-24/",
"s3://path/to/root/folder/uid=1/type=OW/date=2019-07-25/",
"s3://path/to/root/folder/
...
"s3://path/to/root/folder/uid=80004/type=OW/date=2019-08-13")
我认为延迟加载应该对我有帮助,但它没有。有人能解释一下为什么吗?
1条答案
按热度按时间r1zhe5dt1#
我不知道为什么你使用通配符会获得更快的时间..但是对于显式文件夹和完整数据来说,时间差可能是由于spark阅读器需要对所有数据进行完整扫描以推断模式。模式推断步骤在你调用read时立即发生,因为spark需要知道模式才能继续,并导致Spark在后台执行操作。
您可以尝试将samplingRatio选项的默认值从1.0降低到:
或者,您可以尝试显式指定模式:
predicate 下推优化只会加载你的数据的一些分区,但它不会发生,直到你对加载的对象执行一个操作。你可以通过运行explain并查看物理计划来确认这一点,如下所示,在PartitionFilters部分: