使用spark createdataframe剪切dag

62o28rlo  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(287)

我目前在一个spark项目中有一个管道架构,其中流程a的输出是流程b的输入,流程b的输出是流程c的输入,等等。。。一个进程可以包含一个或多个sparksql查询,并输出缓存的Dataframe。这是一个线性管道,因此进程c将只使用进程b的输出,而不使用进程a的输出。
正如您所想象的,所有这些缓存都会占用大量内存,尤其是在大型数据集上。在进程c上,我不再需要进程a的输出,只需要进程b的输出。通常使用spark,如果我取消缓存进程a的输出,那么进程b的输出也会取消缓存,这在进程c中是需要的。为了解决这个问题,我做了以下工作:

val new_output = spark.sqlContext.createDataFrame(output.rdd, output.schema)

new_output.createOrReplaceTempView("new_output")
spark.sql("CACHE TABLE new_output")

inputDf.unpersist(true)

return new_output

当我这样做的时候,它基本上会剪切dag,这样我就可以取消缓存输入的Dataframe。当我对新的输出Dataframe进行解释时,它看起来是这样的:

== Parsed Logical Plan ==
LogicalRDD [a#168], false

问题:我试图看看这种方法是否会导致与spark的弹性相关的问题,或者以其他方式。
我完全知道我可以使用spark检查点将检查点插入dag。使用上述方法,我不必像检查点那样花费时间将数据写入磁盘/外部存储。我希望将所有这些都保存在同一个spark应用程序中,并在进程之间将数据持久化到内存中。
更新1:添加了更多代码来显示缓存和取消缓存的顺序。使用快速缓存,以便执行新的输出dag并将其存储在内存中,从而可以取消对inputdf的缓存。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题