在List Spark Scala中缓存多个 Dataframe

li9yvcax  于 2023-03-02  发布在  Scala
关注(0)|答案(1)|浏览(206)

我有一个List[DataFrame],其中包含从Snowflake表的一些连接中获得的小型 Dataframe 。
目的是缓存列表中的每个DF,以便以后留在内存中。
为了简化这种情况,到目前为止我所尝试的是缓存它们并强制执行一个操作:

  1. val listDfs = [dfA, dfB, dfC]
  2. val dfACached = listDfs.head.cache
  3. // Force an action to enable fully cache on dfA
  4. dfACached.count
  5. val dfBCached = listDfs(2).cache
  6. // Force an action to enable fully cache on dfB
  7. dfBCached.count
  8. val dfCCached = listDfs.last.cache
  9. // Force an action to enable fully cache on dfC
  10. dfCCached.count

然后我需要在foreach循环中使用这些变量来过滤它们,所以我将它们包含在另一个列表中以便能够引用它们:

  1. val listDfsCached: List[DataFrame] = List(dfACached,dfBCached,dfCCached)

到目前为止,这三个 Dataframe 应该缓存在内存中并可以快速访问。
然后,我得到了一个字符串数组,用于过滤DF:

  1. val arrayFilters = ["a", "b", "c"]
  2. listDfsCached.foreach(df => {
  3. val dfFiltered = df.filter(col("test") === str)
  4. // Finally I'll perform some other transformations and write a json file per string in the array
  5. }
  6. }

我可以观察到的是DF没有被正确缓存,因为在执行期间,程序每次都会返回Snowflake重新计算dfA、dfB、dfC以获得dfFiltered。
你知道我做错了什么吗?
我在本地调试中使用Spark版本3.2.1(1个节点- 4个内核)
先谢了。

7vux5j2d

7vux5j2d1#

您可以缓存多个DataFrame,方法是将它们存储在列表中,然后遍历列表以缓存每个DataFrame。下面是一个示例:

  1. import org.apache.spark.sql.DataFrame
  2. // define your DataFrames
  3. val df1: DataFrame = ...
  4. val df2: DataFrame = ...
  5. val df3: DataFrame = ...
  6. // store DataFrames in a list
  7. val dfList: List[DataFrame] = List(df1, df2, df3)
  8. // iterate over list and cache each DataFrame
  9. dfList.foreach(df => df.cache())

一旦缓存, Dataframe 将保留在内存中,直到该高速缓存被清除或Spark应用程序终止。

相关问题