即使有一个 .cache()
d rdd,spark似乎仍然序列化每个任务运行的数据。考虑以下代码:
class LoggingSerializable() extends Externalizable {
override def writeExternal(out: ObjectOutput): Unit = {
println("xxx serializing")
}
override def readExternal(in: ObjectInput): Unit = {
println("xxx deserializing")
}
}
object SparkSer {
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("SparkSer").setMaster("local")
val spark = new SparkContext(conf)
val rdd: RDD[LoggingSerializable] = spark.parallelize(Seq(new LoggingSerializable())).cache()
println("xxx done loading")
rdd.foreach(ConstantClosure)
println("xxx done 1")
rdd.foreach(ConstantClosure)
println("xxx done 2")
spark.stop()
}
}
object ConstantClosure extends (LoggingSerializable => Unit) with Serializable {
def apply(t: LoggingSerializable): Unit = {
println("xxx closure ran")
}
}
它打印出来了
xxx done loading
xxx serializing
xxx deserializing
xxx closure ran
xxx done 1
xxx serializing
xxx deserializing
xxx closure ran
xxx done 2
尽管我打过电话 .cache()
在 rdd
,spark仍然序列化每个调用的数据 .foreach
. 官方文件说
当您持久化rdd时,每个节点将其计算的任何分区存储在内存中,并在该数据集(或从该数据集派生的数据集)上的其他操作中重用这些分区。
然后呢 MEMORY_ONLY
方法
将rdd作为反序列化的java对象存储在jvm中。
请注意,spark在序列化任务时尝试序列化数据,但是 ConstantClosure
不关闭任何内容,所以我不明白为什么需要序列化任何数据。
我之所以这么问是因为我希望能够在本地模式下运行spark而不损失任何性能,但是对于每个rdd操作,必须在rdd中序列化大型元素可能会非常昂贵。我不确定这个问题是否是本地模式独有的。spark似乎不可能通过有线将rdd中的数据发送给每个操作的工人,即使rdd被缓存了。
我用的是spark core 3.0.0。
2条答案
按热度按时间fiei3ece1#
这是因为你正在使用
parallelize
.parallelize
使用特殊的rdd,ParallelCollectionRDD
,将数据放入Partition
s。Partition
定义一个spark任务,它将被发送给spark任务中的执行者(ShuffleMapTask
或者ResultTask
). 如果在中打印堆栈跟踪readExternal
以及writeExternal
,您应该能够看到它在序列化和反序列化spark任务时发生。换句话说,数据是spark任务元数据的一部分
ParallelCollectionRDD
,spark必须发送任务以在执行器中运行,这就是序列化发生的地方。大多数其他RDD从外部系统(如文件)读取数据,因此它们没有这种行为。
6ojccjat2#
我同意这种行为看起来令人惊讶。在我的脑海里,我可能会猜测,这是因为缓存块是异步的,所有这一切发生得非常快。有可能它只是不等待缓存分区变为可用并在第二次重新计算它。
为了验证这个假设,在第二个foreach之前引入一个长时间的等待,看看这是否会改变事情。