我想知道spark.sql.autoBroadcastJoinThreshold
属性对于在所有工作节点上广播较小的表是否有用(同时进行连接),即使连接方案使用API连接而不是使用SparkSQL。
如果我的较大表为250 Gigs,较小表为20 Gigs,我是否需要设置此配置:spark.sql.autoBroadcastJoinThreshold
= 21 Gigs(可能),以便将整个表/Dataset
发送到所有工作节点?
示例:
*数据集API联接
val result = rawBigger.as("b").join(
broadcast(smaller).as("s"),
rawBigger(FieldNames.CAMPAIGN_ID) === smaller(FieldNames.CAMPAIGN_ID),
"left_outer"
)
*SQL语言
select *
from rawBigger_table b, smaller_table s
where b.campign_id = s.campaign_id;
3条答案
按热度按时间xu3bshqb1#
首先,
spark.sql.autoBroadcastJoinThreshold
和broadcast
提示是独立的机制。即使autoBroadcastJoinThreshold
被禁用,设置broadcast
提示也将优先。Spark将使用
autoBroadcastJoinThreshold
并自动广播数据:一个三个三个一个
当我们禁用自动广播时,Spark将使用标准的
SortMergeJoin
:但可以强制使用
BroadcastHashJoin
,并附带broadcast
提示:SQL有自己的提示格式(类似于配置单元中使用的格式):
因此,为了回答您的问题-
autoBroadcastJoinThreshold
在使用Dataset
API时适用,但在使用显式broadcast
提示时不相关。此外,广播大型对象不太可能提供任何性能提升,并且在实践中经常会降低性能并导致稳定性问题。请记住,广播对象必须首先被提取到驱动程序,然后发送到每个工作者,最后加载到内存中。
kiayqfof2#
只是为了分享更多的细节(从代码),以伟大的答案从@user6910411。
引用源代码(格式化我的):
通过将该值设置为,可以禁用广播。
请注意,当前统计信息仅支持运行了
ANALYZE TABLE COMPUTE STATISTICS noscan
命令的配置单元元存储表,以及直接在数据文件上计算统计信息的基于文件的数据源表。spark.sql.autoBroadcastJoinThreshold
默认为10M(即10L * 1024 * 1024
),Spark将检查要使用什么连接(请参见JoinSelection执行规划策略)。有6个不同的连接选择,其中包括广播(使用
BroadcastHashJoinExec
或BroadcastNestedLoopJoinExec
物理操作符)。当存在连接密钥并且满足以下条件之一时,将选择
BroadcastHashJoinExec
:spark.sql.autoBroadcastJoinThreshold
spark.sql.autoBroadcastJoinThreshold
当存在no连接密钥并且
BroadcastHashJoinExec
的上述条件之一成立时,将选择BroadcastNestedLoopJoinExec
。换句话说,Spark将自动选择正确的连接,包括基于
spark.sql.autoBroadcastJoinThreshold
属性(以及其他要求)的BroadcastHashJoinExec
,以及连接类型。6vl6ewon3#
有一个限制,即可以广播的DF的最大大小是8G。您可以拆分 Dataframe ,然后基于您的lkp逻辑以迭代方式LKP子 Dataframe 。