并行处理—spark是逐行还是一起执行转换?

9nvpjoqh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(365)

我试图理解Spark和有以下代码在哪里 df,df2,df3,df4 are Dataset<Row> ```
df=df.join(df2,"ID");
df4=df4.join(df3,"ID");
df=df.union(df4);
long count=df.count();

我的问题是这种转变是如何发生的?在上面的示例中,联合是否等待两个连接完全完成(即完成所有行的连接),然后启动联合?或者它是以流水线的方式逐行进行的,其中一行的连接一旦完成,联合变换就在其上开始(即使其他行的连接仍在进行)?
我试着搜索这个,但找不到任何答案。
axr492tv

axr492tv1#

使用 explain() 看看会发生什么。

df.explain()

== Physical Plan ==
Union
:- *(5) Project [ID#3599L, VALUE#3600, VALUE#3604]
:  +- *(5) SortMergeJoin [ID#3599L], [ID#3603L], Inner
:     :- *(2) Sort [ID#3599L ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(ID#3599L, 200), true, [id=#1332]
:     :     +- *(1) Filter isnotnull(ID#3599L)
:     :        +- *(1) Scan ExistingRDD[ID#3599L,VALUE#3600]
:     +- *(4) Sort [ID#3603L ASC NULLS FIRST], false, 0
:        +- ReusedExchange [ID#3603L, VALUE#3604], Exchange hashpartitioning(ID#3599L, 200), true, [id=#1332]
+- *(10) Project [ID#3599L, VALUE#3600, VALUE#3609]
   +- *(10) SortMergeJoin [ID#3599L], [ID#3608L], Inner
      :- *(7) Sort [ID#3599L ASC NULLS FIRST], false, 0
      :  +- ReusedExchange [ID#3599L, VALUE#3600], Exchange hashpartitioning(ID#3599L, 200), true, [id=#1332]
      +- *(9) Sort [ID#3608L ASC NULLS FIRST], false, 0
         +- ReusedExchange [ID#3608L, VALUE#3609], Exchange hashpartitioning(ID#3599L, 200), true, [id=#1332]

sort、sortmergejoin、project和union。这发生在你采取行动时,比如 count() 在那之前,spark将计划如何继续。

相关问题