为什么spark lazy加载比通配符或传递数组中的文件夹慢?

mdfafbf1  于 2023-10-23  发布在  Apache
关注(0)|答案(1)|浏览(156)

我有一个数据集,它被分区并作为一组JSON文件写入s3。
分区结构为:uid -通常小于20个分区。Int,可能值:1-100,70000-70100,80000-80100 type - String 2 partitions date -作为通常的日期。通常在2个月的范围内。hour -明显的24个分区。
一个分区内有一个JSON文件。一般数据量~ 30 GB
当我通过根文件夹,因为它花了**~3分30秒**打印文件的架构。

val df = spark.read.json("s3://path/to/root/folder")
df.printSchema()

当我通过根文件夹,但执行初始过滤,它也花了**~3分30秒**。

val df = spark.read.json("s3://path/to/root/folder")
  .filter(col("uid") === 1 || col("uid") === 80004)
df.printSchema()

我通过通配所有分区将加载时间减少到2分31秒

val df = spark.read.json("s3://path/to/root/folder/uid=*/type=*/date=*/hour=*")
df.printSchema()

我试着把所有需要的uid都括起来,它起作用了。我把加载时间减少到了**~ 34秒**

val df = spark.read
  .option("basePath", "s3://path/to/root/folder")
  .json("s3://path/to/root/folder/uid={1,80004}") 
df.printSchema()

我还尝试在数组中添加所需的文件夹,并获得相同的时间。

val df = spark.read
  .option("basePath", "s3://path/to/root/folder/")
  .json("s3://path/to/root/folder/uid=1/type=OW/date=2019-07-24/",
        "s3://path/to/root/folder/uid=1/type=OW/date=2019-07-25/",
        "s3://path/to/root/folder/
        ...
        "s3://path/to/root/folder/uid=80004/type=OW/date=2019-08-13")

我认为延迟加载应该对我有帮助,但它没有。有人能解释一下为什么吗?

r1zhe5dt

r1zhe5dt1#

我不知道为什么你使用通配符会获得更快的时间..但是对于显式文件夹和完整数据来说,时间差可能是由于spark阅读器需要对所有数据进行完整扫描以推断模式。模式推断步骤在你调用read时立即发生,因为spark需要知道模式才能继续,并导致Spark在后台执行操作。
您可以尝试将samplingRatio选项的默认值从1.0降低到:

val df = spark.read.option("samplingRatio", "0.1").json("s3://...")

或者,您可以尝试显式指定模式:

import org.apache.spark.sql.types._

val schema = StructType(
    StructField("uid",IntegerType,true) ::
    StructField("type",StringType,true) :: 
    ... :: Nil
)

val df = spark.read.schema(schema).json("s3://.....")

predicate 下推优化只会加载你的数据的一些分区,但它不会发生,直到你对加载的对象执行一个操作。你可以通过运行explain并查看物理计划来确认这一点,如下所示,在PartitionFilters部分:

df.filter($"uid" === "1" && $"type" === "bob").explain(true)
== Parsed Logical Plan ==
'Filter (('uid = 1) && ('type = bob))
+- Relation[key#70,key2#71,uid#72,type#73] json

== Analyzed Logical Plan ==
key: string, key2: string, uid: int, type: string
Filter ((uid#72 = cast(1 as int)) && (type#73 = bob))
+- Relation[key#70,key2#71,uid#72,type#73] json

== Optimized Logical Plan ==
Filter (((isnotnull(uid#72) && isnotnull(type#73)) && (uid#72 = 1)) && (type#73 = bob))
+- Relation[key#70,key2#71,uid#72,type#73] json

== Physical Plan ==
*(1) FileScan json [key#70,key2#71,uid#72,type#73] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/.../test/stackoverflow], PartitionCount: 1, PartitionFilters: [isnotnull(uid#72), isnotnull(type#73), (uid#72 = 1), (type#73 = bob)], PushedFilters: [], ReadSchema: struct<key:string,key2:string>

相关问题