在spark中不添加join键的情况下处理dataskew

jei2mxaa  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(471)

我试图用一个30行的Dataframe内部连接一百万行的Dataframe,并且两个表都有相同的连接键,spark试图执行排序合并连接,因此我的所有数据最终都在同一个执行器中,例如,作业永远不会完成

DF1(million rows dataframe registered as TempView DF1)
+-------+-----------+
|   id  |  price    | 
+-------+-----------+
|    1  |   30      |
|    1  |   10      |
|    1  |   12      |
|    1  |   15      |
+-------+-----------+
DF2(30 rows dataframe registered as TempView DF2)
+-------+-----------+
|   id  |  Month    | 
+-------+-----------+
|    1  |   Jan     |
|    1  |   Feb     |
+-------+-----------+

我试着跟着
广播

spark.sql("Select /*+ BROADCAST(Df2) */ Df1.* from Df1 inner join Df2 on Df1.id=Df2.id").createTempView("temp")

重新分区

Df1.repartition(200)

查询执行计划

00 Project [.......................]
01 +- SortMergeJoin [.............................],Inner
02    :- Project [.............................]
03    :  +-Filter is notnull[JoinKey]
04    :    +- FileScan orc[..........................]
05    +-Project [.............................]
06      +-BroadcastHashJoin [..........................], LeftOuter, BuildRight
07        :- BroadCastHashJoin [......................],LeftSemi, BuildRight

分区数的输出

spark.table("temp").withColumn("partition_id",spark_partition_id).groupBy
("partition_id").count
+-------+---------------+
|    21 |300,00,000     |
+-------+---------------+

即使我重新分区/广播数据,spark也会在连接时将所有数据带到一个执行器,并且数据会在一个执行器上发生偏移。我还尝试将spark.sql.join.prefersortmergejoin关闭为false。但我仍然看到我的数据在一个执行者身上被扭曲。有人能帮我吗?

ev7lccsx

ev7lccsx1#

就这样做,效果很好。数据按原样,没有分区。

import org.apache.spark.sql.functions.broadcast
 // Simulate some data
 val df1 = spark.range(1000000).rdd.map(x => (1, "xxx")).toDF("one", "val")
 val df2 = spark.range(30).rdd.map(x => (1, "yyy")).toDF("one", "val2")
 // Data is as is, has no partitioning applied

 val df3 = df1.join(broadcast(df2), "one")  
 df3.count // An action to kick it all along

 // Look at final counts of partitions
 val rddcounts = df3.rdd.mapPartitions(iter => Array(iter.size).iterator, true) 
 rddcounts.collect

退货:

res26: Array[Int] = Array(3750000, 3750000, 3750000, 3750000, 3750000, 3750000, 3750000, 3750000)

这依赖于默认的并行性,8依赖于ce databricks集群。
广播在任何情况下都应该起作用,因为小table很小。
即使这样:

val df = spark.range(1000000).rdd.map(x => (1, "xxx")).toDF("one", "val")
val df1 = df.repartition(50)

它与50个分区并行工作。这是循环分区,意味着集群将获得分布在至少有n个执行者的n个工作进程上的分区。它不是散列的,通过指定一个列来调用散列,如果所有值都相同,则会导致偏斜。i、 e.所有数据在一个worker上的相同分区。
qed:所以,不是所有的执行器都只在一个执行器上工作,除非你的spark应用程序或散列应用程序只有一个执行器。
之后,我用local[4]在我的实验笔记本电脑上运行,数据由4个内核提供服务,因此有4个执行器。不加盐,平行4。所以,奇怪的是你不能得到,除非你散列。


如果在一个真正的集群上,您可以看到4个并行任务,因此并非所有任务都在1个执行器上。

fcy6dtqo

fcy6dtqo2#

为什么所有数据都移动到一个执行者?如果在df1中只有相同的id(id:1),则使用id加入df2。根据hashpartitioner,id=1的数据将始终一起移动。
你加入了吗?在spark ui中查看

相关问题