通过RDD和缓存的作用进行Apache Spark Dataframe 谱系修整

ecbunoof  于 2022-12-23  发布在  Apache
关注(0)|答案(1)|浏览(142)

有以下技巧可以修剪Apache Spark Dataframe 谱系,特别是对于迭代计算:

def getCachedDataFrame(df: DataFrame): DataFrame = {
    val rdd = df.rdd.cache()
    df.sqlContext.createDataFrame(rdd, df.schema)
}

这看起来像是某种纯粹的魔术,但是现在我想知道为什么我们需要在RDD上调用cache()方法?在这个沿袭修整逻辑中使用缓存的目的是什么?

2hh7jdfx

2hh7jdfx1#

要理解缓存的用途,了解不同类型的RDD操作会有所帮助:转换和操作。来自文档:
RDD支持两种类型的操作:转换,从现有数据集创建新数据集;操作,在数据集上运行计算后将值返回到驱动程序。
还要考虑这一点:
Spark中的所有转换都是惰性的,因为它们不会立即计算结果,而是只记住应用于某个基本数据集的转换(例如文件)。只有当动作需要返回结果给驱动程序时,才会计算转换。这种设计使Spark能够更有效地运行。例如,我们可以认识到,通过Map创建的数据集将在Reduce中使用,并且仅将Reduce的结果而不是较大的Map数据集返回给驱动程序。
所以Spark的转换(比如map)都是惰性的,因为这有助于Spark在创建查询计划时更明智地考虑需要进行哪些计算。

这与缓存有什么关系?

请看下面的代码:

// Reading in some data
val df = spark.read.parquet("some_big_file.parquet")

// Applying some transformations on the data (lazy operations here)
val cleansedDF = df
  .filter(filteringFunction)
  .map(cleansingFunction)

// Executing an action, triggering the transformations to be calculated
cleansedDF.write.parquet("output_file.parquet")

// Executing another action, triggering the transformations to be calculated
// again
println(s"You have ${cleansedDF.count} rows in the cleansed data")

在这里,我们读入一些文件,应用一些转换,并对同一 Dataframe 应用2个操作:cleansedDF.write.parquetcleansedDF.count
正如代码中的注解所解释的那样,如果我们像这样运行代码,我们实际上将计算两次这些转换,因为转换是惰性的,只有当操作需要执行它们时,它们才会执行。
如何防止这种重复计算?使用缓存:我们可以告诉Spark保留一些转换的"保存"结果,这样它们就不必被计算多次。这可以是在磁盘/内存/...
有了这些知识,我们的代码可能是这样的:

// Reading in some data
val df = spark.read.parquet("some_big_file.parquet")

// Applying some transformations on the data (lazy operations here) AND caching the result of this calculation
val cleansedDF = df
  .filter(filteringFunction)
  .map(cleansingFunction)
  .cache

// Executing an action, triggering the transformations to be calculated AND the results to be cached
cleansedDF.write.parquet("output_file.parquet")

// Executing another action, reusing the cached data
println(s"You have ${cleansedDF.count} rows in the cleansed data")

我调整了这个代码块中的注解,以突出显示与上一个代码块的不同之处。
注意.persist也存在,使用.cache时使用默认的存储级别,使用.persist时可以指定存储级别,this SO answer对此做了很好的解释。
希望这有帮助!

相关问题