在List Spark Scala中缓存多个 Dataframe

li9yvcax  于 2023-03-02  发布在  Scala
关注(0)|答案(1)|浏览(168)

我有一个List[DataFrame],其中包含从Snowflake表的一些连接中获得的小型 Dataframe 。
目的是缓存列表中的每个DF,以便以后留在内存中。
为了简化这种情况,到目前为止我所尝试的是缓存它们并强制执行一个操作:

val listDfs = [dfA, dfB, dfC]

val dfACached = listDfs.head.cache
// Force an action to enable fully cache on dfA
dfACached.count

val dfBCached = listDfs(2).cache
// Force an action to enable fully cache on dfB
dfBCached.count

val dfCCached = listDfs.last.cache
// Force an action to enable fully cache on dfC
dfCCached.count

然后我需要在foreach循环中使用这些变量来过滤它们,所以我将它们包含在另一个列表中以便能够引用它们:

val listDfsCached: List[DataFrame] = List(dfACached,dfBCached,dfCCached)

到目前为止,这三个 Dataframe 应该缓存在内存中并可以快速访问。
然后,我得到了一个字符串数组,用于过滤DF:

val arrayFilters = ["a", "b", "c"]

listDfsCached.foreach(df => {
    val dfFiltered = df.filter(col("test") === str)
    // Finally I'll perform some other transformations and write a json file per string in the array
  }
}

我可以观察到的是DF没有被正确缓存,因为在执行期间,程序每次都会返回Snowflake重新计算dfA、dfB、dfC以获得dfFiltered。
你知道我做错了什么吗?
我在本地调试中使用Spark版本3.2.1(1个节点- 4个内核)
先谢了。

7vux5j2d

7vux5j2d1#

您可以缓存多个DataFrame,方法是将它们存储在列表中,然后遍历列表以缓存每个DataFrame。下面是一个示例:

import org.apache.spark.sql.DataFrame

// define your DataFrames
val df1: DataFrame = ...
val df2: DataFrame = ...
val df3: DataFrame = ...

// store DataFrames in a list
val dfList: List[DataFrame] = List(df1, df2, df3)

// iterate over list and cache each DataFrame
dfList.foreach(df => df.cache())

一旦缓存, Dataframe 将保留在内存中,直到该高速缓存被清除或Spark应用程序终止。

相关问题