scala Spark: Dataframe 检查点与显式写入磁盘

vu8f3i0k  于 2023-04-30  发布在  Scala
关注(0)|答案(2)|浏览(143)

检查点版本:

val savePath = "/some/path"
spark.sparkContext.setCheckpointDir(savePath)
df.checkpoint()

写入磁盘版本:

df.write.parquet(savePath)
val df = spark.read.parquet(savePath)

我认为两者都以同样的方式打破了血统。
在我的实验中,检查点在磁盘上的大小几乎是parquet的30倍(689GB与24GB)。在运行时间方面,checkpoint需要1.5倍长(10。5分钟对7.5分钟)。
考虑到所有这些,使用检查点而不是保存到文件有什么意义?我错过什么了吗?

rsaldnfx

rsaldnfx1#

检查点是截断RDD沿袭图并将其保存到可靠的分布式(HDFS)或本地文件系统的过程。如果你有一个很大的RDD沿袭图,并且你想冻结当前RDD i的内容。在进行下一步之前具体化完整的RDD,通常使用persist或checkpoint。检查点RDD然后可以用于一些其他目的。
当您检查点时,RDD被序列化并存储在磁盘中。它不存储在Parquet格式,所以数据是不正确的存储优化的磁盘。对比parquet,它提供了各种压缩和编码来存储优化数据。这可以解释尺寸的差异。

  • 您绝对应该考虑在嘈杂的集群中设置检查点。如果有大量的作业和用户竞争资源,并且没有足够的资源来同时运行所有的作业,则集群被称为噪声集群。
  • 如果你的计算真的很昂贵并且需要很长时间才能完成,你必须考虑检查点,因为将RDD写入HDFS并并行读取它可能比从头开始重新计算更快。

在spark 2之前有一点小小的不便。1版本;没有办法对 Dataframe 进行检查点检查,因此您必须对底层RDD进行检查点检查。此issue已在spark 2中解决。1及以上版本。
在parquet中保存到磁盘并读回的问题是

  • 可能在编码中不方便。您需要多次保存和读取。
  • 在作业的整体性能中,这可能是一个较慢的过程。因为当你保存为 parquet 并读取它时,数据框架需要再次重建。

wiki可用于进一步调查
如数据集检查点wiki中所示
检查点实际上是Spark Core(Spark SQL用于分布式计算)的一个功能,它允许驱动程序在失败时重新启动,并将先前计算的分布式计算状态描述为RDD。这已经成功地用于Spark Streaming -现在已经过时的Spark模块,用于基于RDD API的流处理。
检查点截断要检查的RDD的谱系。这已经在Spark MLlib中成功地用于迭代机器学习算法,如ALS。
Spark SQL中的Dataset checkpointing使用checkpointing来截断被检查的Dataset的底层RDD的血统。

kuarbcqp

kuarbcqp2#

一个区别是,如果你的spark作业需要一个特定的内存分区方案,例如,如果你使用一个窗口函数,那么检查点将持久化到磁盘,而写入parquet不会。
我不知道spark的当前版本有什么方法可以编写Parquet文件,然后再读取它们,并使用特定的内存分区策略。文件夹级分区对此没有帮助。

相关问题