我目前在一个spark项目中有一个管道架构,其中流程a的输出是流程b的输入,流程b的输出是流程c的输入,等等。。。一个进程可以包含一个或多个sparksql查询,并输出缓存的Dataframe。这是一个线性管道,因此进程c将只使用进程b的输出,而不使用进程a的输出。
正如您所想象的,所有这些缓存都会占用大量内存,尤其是在大型数据集上。在进程c上,我不再需要进程a的输出,只需要进程b的输出。通常使用spark,如果我取消缓存进程a的输出,那么进程b的输出也会取消缓存,这在进程c中是需要的。为了解决这个问题,我做了以下工作:
val new_output = spark.sqlContext.createDataFrame(output.rdd, output.schema)
new_output.createOrReplaceTempView("new_output")
spark.sql("CACHE TABLE new_output")
inputDf.unpersist(true)
return new_output
当我这样做的时候,它基本上会剪切dag,这样我就可以取消缓存输入的Dataframe。当我对新的输出Dataframe进行解释时,它看起来是这样的:
== Parsed Logical Plan ==
LogicalRDD [a#168], false
问题:我试图看看这种方法是否会导致与spark的弹性相关的问题,或者以其他方式。
我完全知道我可以使用spark检查点将检查点插入dag。使用上述方法,我不必像检查点那样花费时间将数据写入磁盘/外部存储。我希望将所有这些都保存在同一个spark应用程序中,并在进程之间将数据持久化到内存中。
更新1:添加了更多代码来显示缓存和取消缓存的顺序。使用快速缓存,以便执行新的输出dag并将其存储在内存中,从而可以取消对inputdf的缓存。
暂无答案!
目前还没有任何答案,快来回答吧!