如何检查数据是缓存在dataframe中还是由于pyspark中的延迟执行而尚未缓存?

cxfofazt  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(437)

我的问题和我在堆栈溢出上发现的其他问题没有什么不同。我需要知道数据是否已经被检索并存储在Dataframe中,或者这是否还没有发生
我在做这样的事

df1=spark.table("sourceDB.Table1")
df1.cache()

现在,如您所知,由于延迟执行,尚未从源表读取数据。所以我需要一个表达式,在这里表示结果为“false”。
过了一段时间,我正在做一些需要从源代码检索数据的操作。例如。

df1.groupBy("col3").agg(sum("col1").alias("sum_of_col1")).select("sum_of_col1","col3").filter("sum_of_col1 >= 100").show()

此时,数据必须已被读取并存储在df1的缓存中。所以我需要有一个表达式,在这里表示结果为“真”。
我们到底有没有办法做到这一点?我相信df1.is\u cached在这种情况下不会有帮助

bhmjp9jg

bhmjp9jg1#

也许这是有用的
1如果你想检查 cache/persist 已在Dataframe上触发,然后您可以使用 cachemanager 确认如下-

spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).nonEmpty

2如果您想检查数据是否在内存中,下面的方法可能会有所帮助-

def checkIfDataIsInMemory(df: DataFrame): Boolean = {
      val manager = df.sparkSession.sharedState.cacheManager
      // step 1 - check if the dataframe.cache is issued earlier or not
      if (manager.lookupCachedData(df.queryExecution.logical).nonEmpty) {// cache statement was already issued
        println("Cache statement is already issued on this dataframe")
        // step-2 check if the data is in memory or not
        val cacheData = manager.lookupCachedData(df.queryExecution.logical).get
        cacheData.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
      } else false
    }

三。测试上述方法-

val df = spark.read
      .parquet(getClass.getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
        ".parquet").getPath)
    println(checkIfDataIsInMemory(df))
    /**
      * false
      */

    df.cache()
    // check if the data is cached
    println(checkIfDataIsInMemory(df))
    /**
      * Cache statement is already issued on this dataframe
      * false
      */

    println(df.count())
    println(checkIfDataIsInMemory(df))

    /**
      * 1
      * Cache statement is already issued on this dataframe
      * true
      */

相关问题