Apache Spark autoBroadcastJoinThreshold是否适用于使用数据集连接运算符的连接?

ycl3bljg  于 2023-03-09  发布在  Apache
关注(0)|答案(3)|浏览(186)

我想知道spark.sql.autoBroadcastJoinThreshold属性对于在所有工作节点上广播较小的表是否有用(同时进行连接),即使连接方案使用API连接而不是使用SparkSQL。
如果我的较大表为250 Gigs,较小表为20 Gigs,我是否需要设置此配置:spark.sql.autoBroadcastJoinThreshold = 21 Gigs(可能),以便将整个表/Dataset发送到所有工作节点?

示例
*数据集API联接

val result = rawBigger.as("b").join(
  broadcast(smaller).as("s"),
  rawBigger(FieldNames.CAMPAIGN_ID) === smaller(FieldNames.CAMPAIGN_ID), 
  "left_outer"
)

*SQL语言

select * 
from rawBigger_table b, smaller_table s
where b.campign_id = s.campaign_id;
xu3bshqb

xu3bshqb1#

首先,spark.sql.autoBroadcastJoinThresholdbroadcast提示是独立的机制。即使autoBroadcastJoinThreshold被禁用,设置broadcast提示也将优先。

spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
String = 10485760
val df1 = spark.range(100)
val df2 = spark.range(100)

Spark将使用autoBroadcastJoinThreshold并自动广播数据:
一个三个三个一个
当我们禁用自动广播时,Spark将使用标准的SortMergeJoin

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
   :- *Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 200)
   :     +- *Range (0, 100, step=1, splits=Some(8))
   +- *Sort [id#3L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)

但可以强制使用BroadcastHashJoin,并附带broadcast提示:

df1.join(broadcast(df2), Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
   :- *Range (0, 100, step=1, splits=Some(8))
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- *Range (0, 100, step=1, splits=Some(8))

SQL有自己的提示格式(类似于配置单元中使用的格式):

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

spark.sql(
 "SELECT  /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Range (0, 100, step=1, splits=8)

因此,为了回答您的问题-autoBroadcastJoinThreshold在使用Dataset API时适用,但在使用显式broadcast提示时不相关。
此外,广播大型对象不太可能提供任何性能提升,并且在实践中经常会降低性能并导致稳定性问题。请记住,广播对象必须首先被提取到驱动程序,然后发送到每个工作者,最后加载到内存中。

kiayqfof

kiayqfof2#

只是为了分享更多的细节(从代码),以伟大的答案从@user6910411。
引用源代码(格式化我的):

    • spark. sql. autoBroadcastJoinThreshold**配置执行连接时将广播到所有工作节点的表的最大大小(以字节为单位)。

通过将该值设置为,可以禁用广播。
请注意,当前统计信息仅支持运行了ANALYZE TABLE COMPUTE STATISTICS noscan命令的配置单元元存储表,以及直接在数据文件上计算统计信息的基于文件的数据源表。
spark.sql.autoBroadcastJoinThreshold默认为10M(即10L * 1024 * 1024),Spark将检查要使用什么连接(请参见JoinSelection执行规划策略)。
6个不同的连接选择,其中包括广播(使用BroadcastHashJoinExecBroadcastNestedLoopJoinExec物理操作符)。
当存在连接密钥并且满足以下条件之一时,将选择BroadcastHashJoinExec

  • Join是可以广播的CROSS、INNER、LEFT ANTI、LEFT OUTER、LEFT SEMI和右连接边之一,即大小小于spark.sql.autoBroadcastJoinThreshold
  • Join为CROSS、INNER、RIGHT OUTER之一,左连接边可以广播,即大小小于spark.sql.autoBroadcastJoinThreshold

当存在no连接密钥并且BroadcastHashJoinExec的上述条件之一成立时,将选择BroadcastNestedLoopJoinExec
换句话说,Spark将自动选择正确的连接,包括基于spark.sql.autoBroadcastJoinThreshold属性(以及其他要求)的BroadcastHashJoinExec,以及连接类型。

6vl6ewon

6vl6ewon3#

有一个限制,即可以广播的DF的最大大小是8G。您可以拆分 Dataframe ,然后基于您的lkp逻辑以迭代方式LKP子 Dataframe 。

相关问题