我有一个List[DataFrame],其中包含从Snowflake表的一些连接中获得的小型 Dataframe 。
目的是缓存列表中的每个DF,以便以后留在内存中。
为了简化这种情况,到目前为止我所尝试的是缓存它们并强制执行一个操作:
val listDfs = [dfA, dfB, dfC]
val dfACached = listDfs.head.cache
// Force an action to enable fully cache on dfA
dfACached.count
val dfBCached = listDfs(2).cache
// Force an action to enable fully cache on dfB
dfBCached.count
val dfCCached = listDfs.last.cache
// Force an action to enable fully cache on dfC
dfCCached.count
然后我需要在foreach循环中使用这些变量来过滤它们,所以我将它们包含在另一个列表中以便能够引用它们:
val listDfsCached: List[DataFrame] = List(dfACached,dfBCached,dfCCached)
到目前为止,这三个 Dataframe 应该缓存在内存中并可以快速访问。
然后,我得到了一个字符串数组,用于过滤DF:
val arrayFilters = ["a", "b", "c"]
listDfsCached.foreach(df => {
val dfFiltered = df.filter(col("test") === str)
// Finally I'll perform some other transformations and write a json file per string in the array
}
}
我可以观察到的是DF没有被正确缓存,因为在执行期间,程序每次都会返回Snowflake重新计算dfA、dfB、dfC以获得dfFiltered。
你知道我做错了什么吗?
我在本地调试中使用Spark版本3.2.1(1个节点- 4个内核)
先谢了。
1条答案
按热度按时间7vux5j2d1#
您可以缓存多个DataFrame,方法是将它们存储在列表中,然后遍历列表以缓存每个DataFrame。下面是一个示例:
一旦缓存, Dataframe 将保留在内存中,直到该高速缓存被清除或Spark应用程序终止。