在spark中,对象和变量是如何保存在内存中并跨不同的执行器保存的?
我正在使用:
Spark3.0.0
斯卡拉2.12
我正在写一个Spark结构化流与自定义流源的工作。在执行spark查询之前,我创建了一组由spark流作业使用的元数据
我试图理解这些元数据是如何跨不同的执行器保存在内存中的?
示例代码:
case class JobConfig(fieldName: String, displayName: String, castTo: String)
val jobConfigs:List[JobConfig] = build(); //build the job configs
val query = spark
.readStream
.format("custom-streaming")
.load
query
.writeStream
.trigger(Trigger.ProcessingTime(2, TimeUnit.MINUTES))
.foreachBatch { (batchDF: DataFrame, batchId: Long) => {
CustomJobExecutor.start(jobConfigs) //CustomJobExecutor does data frame transformations and save the data in PostgreSQL.
}
}.outputMode(OutputMode.Append()).start().awaitTermination()
需要帮助了解以下内容:
在示例代码中,spark如何跨不同的执行器将“jobconfigs”保存在内存中?
广播还有什么好处吗?
保留不能反序列化的变量的有效方法是什么?
1条答案
按热度按时间1rhkuytd1#
为每个任务复制局部变量,同时仅为每个执行器复制广播变量。来自文档
spark操作是通过一组阶段执行的,由分布式的“shuffle”操作分隔开来。spark自动广播每个阶段中任务所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着,只有当跨多个阶段的任务需要相同的数据或者以反序列化的形式缓存数据很重要时,显式创建广播变量才有用。
这意味着,如果jobconfigs足够大,并且使用变量的任务和阶段的数量明显大于执行者的数量,或者反序列化非常耗时,那么广播变量就可以起到作用。在其他情况下,他们没有。