scala 执行计划在Spark中如何工作?

uqcuzwp8  于 2023-05-17  发布在  Scala
关注(0)|答案(2)|浏览(149)

我有一个情况,我需要在完成所有转换后在两个不同的表中写入相同的数据。
过程类似于:
1.阅读来源
1.应用转换
1.在TableA中写入
现在,我想在TableB中也写入相同的数据,只是在分区中有轻微的变化(例如:删除我在TableA中写入时使用的一些分区)。
那么如果我这样做,在TableA中写入完成后,在写入TableB时,步骤1和2是否会再次发生?
做这件事的有效方法是什么?
谢谢!

zengzsys

zengzsys1#

执行计划是被执行以将查询语言语句转换成由DAG表示的优化的逻辑和物理操作的集合的操作的集合。
在Spark中,我们有一个名为Catalyst的优化器,它的目标是自动生成处理查询的最有效方法。
我建议你做关于DAG的研究,催化剂,逻辑和物理计划...
对于你的情况,我想你会有这样的东西:

val secondStepResults = {
//do the step 2 transformations
//and to the persist here in order to save the dataframe in memory/disk depending on the use case and cluster configurations
}

保存到表A

secondStepResults
   .write
   .mode(saveMode)
   .saveAsTable("TableA")

将更改应用到您的分区,从步骤2的结果

val secondStepResultsAfterChanges = secondStepResults
   .map(df => process(df))

保存到表B

secondStepResultsAfterChanges 
   .write
   .mode(saveMode)
   .saveAsTable("TableB")
yyhrrdl8

yyhrrdl82#

假设您有以下工作流(您正在重复大多数操作):

|-> export (/path1)
read -> select -> rename -|
                          |-> group by -> export (/path2)

在这种情况下,最好在rename操作时对内存中的数据集执行cache()persist()操作。这两个操作将DataFrame存储在内存中,这可以显著加快对该数据的重复访问。如果您有一个处理小数据的管道,那么缓存或持久化可能只会产生不必要的开销。
旁注:缓存或持久化会消耗大量内存,当您有一个将被多次使用的大型DataFrame时,这是最有益的,因此您需要确保您有足够的资源。当你persist()cache()一个DataFrame时,你可以显式地specify the storage level(内存、磁盘、内存+磁盘等)。
回答你的问题:

So If I do that, will step 1 and 2 happen again while writing for TableB once writing in TableA is completed?

在这种情况下,Spark将read, select and rename两次,因为它遵循懒惰的计算策略,这意味着计算不会立即执行。这就是缓存发挥作用的地方,因为在管道的执行中,产生的DataFrame将存储在内存中,因此在第二次执行时,所有内容都将被跳过到缓存点。

相关问题