我有一个情况,我需要在完成所有转换后在两个不同的表中写入相同的数据。过程类似于:1.阅读来源1.应用转换1.在TableA中写入现在,我想在TableB中也写入相同的数据,只是在分区中有轻微的变化(例如:删除我在TableA中写入时使用的一些分区)。那么如果我这样做,在TableA中写入完成后,在写入TableB时,步骤1和2是否会再次发生?做这件事的有效方法是什么?谢谢!
TableA
TableB
zengzsys1#
执行计划是被执行以将查询语言语句转换成由DAG表示的优化的逻辑和物理操作的集合的操作的集合。在Spark中,我们有一个名为Catalyst的优化器,它的目标是自动生成处理查询的最有效方法。我建议你做关于DAG的研究,催化剂,逻辑和物理计划...对于你的情况,我想你会有这样的东西:
val secondStepResults = { //do the step 2 transformations //and to the persist here in order to save the dataframe in memory/disk depending on the use case and cluster configurations }
保存到表A
secondStepResults .write .mode(saveMode) .saveAsTable("TableA")
将更改应用到您的分区,从步骤2的结果
val secondStepResultsAfterChanges = secondStepResults .map(df => process(df))
保存到表B
secondStepResultsAfterChanges .write .mode(saveMode) .saveAsTable("TableB")
yyhrrdl82#
假设您有以下工作流(您正在重复大多数操作):
|-> export (/path1) read -> select -> rename -| |-> group by -> export (/path2)
在这种情况下,最好在rename操作时对内存中的数据集执行cache()或persist()操作。这两个操作将DataFrame存储在内存中,这可以显著加快对该数据的重复访问。如果您有一个处理小数据的管道,那么缓存或持久化可能只会产生不必要的开销。旁注:缓存或持久化会消耗大量内存,当您有一个将被多次使用的大型DataFrame时,这是最有益的,因此您需要确保您有足够的资源。当你persist()或cache()一个DataFrame时,你可以显式地specify the storage level(内存、磁盘、内存+磁盘等)。回答你的问题:
rename
cache()
persist()
So If I do that, will step 1 and 2 happen again while writing for TableB once writing in TableA is completed?
在这种情况下,Spark将read, select and rename两次,因为它遵循懒惰的计算策略,这意味着计算不会立即执行。这就是缓存发挥作用的地方,因为在管道的执行中,产生的DataFrame将存储在内存中,因此在第二次执行时,所有内容都将被跳过到缓存点。
read, select and rename
2条答案
按热度按时间zengzsys1#
执行计划是被执行以将查询语言语句转换成由DAG表示的优化的逻辑和物理操作的集合的操作的集合。
在Spark中,我们有一个名为Catalyst的优化器,它的目标是自动生成处理查询的最有效方法。
我建议你做关于DAG的研究,催化剂,逻辑和物理计划...
对于你的情况,我想你会有这样的东西:
保存到表A
将更改应用到您的分区,从步骤2的结果
保存到表B
yyhrrdl82#
假设您有以下工作流(您正在重复大多数操作):
在这种情况下,最好在
rename
操作时对内存中的数据集执行cache()
或persist()
操作。这两个操作将DataFrame存储在内存中,这可以显著加快对该数据的重复访问。如果您有一个处理小数据的管道,那么缓存或持久化可能只会产生不必要的开销。旁注:缓存或持久化会消耗大量内存,当您有一个将被多次使用的大型DataFrame时,这是最有益的,因此您需要确保您有足够的资源。当你
persist()
或cache()
一个DataFrame时,你可以显式地specify the storage level(内存、磁盘、内存+磁盘等)。回答你的问题:
在这种情况下,Spark将
read, select and rename
两次,因为它遵循懒惰的计算策略,这意味着计算不会立即执行。这就是缓存发挥作用的地方,因为在管道的执行中,产生的DataFrame将存储在内存中,因此在第二次执行时,所有内容都将被跳过到缓存点。