我试图用一个30行的Dataframe内部连接一百万行的Dataframe,并且两个表都有相同的连接键,spark试图执行排序合并连接,因此我的所有数据最终都在同一个执行器中,例如,作业永远不会完成
DF1(million rows dataframe registered as TempView DF1)
+-------+-----------+
| id | price |
+-------+-----------+
| 1 | 30 |
| 1 | 10 |
| 1 | 12 |
| 1 | 15 |
+-------+-----------+
DF2(30 rows dataframe registered as TempView DF2)
+-------+-----------+
| id | Month |
+-------+-----------+
| 1 | Jan |
| 1 | Feb |
+-------+-----------+
我试着跟着
广播
spark.sql("Select /*+ BROADCAST(Df2) */ Df1.* from Df1 inner join Df2 on Df1.id=Df2.id").createTempView("temp")
重新分区
Df1.repartition(200)
查询执行计划
00 Project [.......................]
01 +- SortMergeJoin [.............................],Inner
02 :- Project [.............................]
03 : +-Filter is notnull[JoinKey]
04 : +- FileScan orc[..........................]
05 +-Project [.............................]
06 +-BroadcastHashJoin [..........................], LeftOuter, BuildRight
07 :- BroadCastHashJoin [......................],LeftSemi, BuildRight
分区数的输出
spark.table("temp").withColumn("partition_id",spark_partition_id).groupBy
("partition_id").count
+-------+---------------+
| 21 |300,00,000 |
+-------+---------------+
即使我重新分区/广播数据,spark也会在连接时将所有数据带到一个执行器,并且数据会在一个执行器上发生偏移。我还尝试将spark.sql.join.prefersortmergejoin关闭为false。但我仍然看到我的数据在一个执行者身上被扭曲。有人能帮我吗?
2条答案
按热度按时间ev7lccsx1#
就这样做,效果很好。数据按原样,没有分区。
退货:
这依赖于默认的并行性,8依赖于ce databricks集群。
广播在任何情况下都应该起作用,因为小table很小。
即使这样:
它与50个分区并行工作。这是循环分区,意味着集群将获得分布在至少有n个执行者的n个工作进程上的分区。它不是散列的,通过指定一个列来调用散列,如果所有值都相同,则会导致偏斜。i、 e.所有数据在一个worker上的相同分区。
qed:所以,不是所有的执行器都只在一个执行器上工作,除非你的spark应用程序或散列应用程序只有一个执行器。
之后,我用local[4]在我的实验笔记本电脑上运行,数据由4个内核提供服务,因此有4个执行器。不加盐,平行4。所以,奇怪的是你不能得到,除非你散列。
如果在一个真正的集群上,您可以看到4个并行任务,因此并非所有任务都在1个执行器上。
fcy6dtqo2#
为什么所有数据都移动到一个执行者?如果在df1中只有相同的id(id:1),则使用id加入df2。根据hashpartitioner,id=1的数据将始终一起移动。
你加入了吗?在spark ui中查看