我正在尝试使用SparkSQL在多个框架上执行广播哈希连接,如此处所示。
在该示例中,(小)DataFrame
通过saveAsTable持久化,然后通过spark SQL(即通过sqlContext.sql("..."))
)进行连接
我遇到的问题是,我需要使用sparkSQL API来构造我的SQL(我用ID列表连接了大约50个表,并且不想手工编写SQL)。
如何通过API告诉spark使用广播哈希连接?问题是,如果我将ID列表(从通过saveAsTable
持久化的表中)加载到DataFrame
中以用于连接,我不清楚Spark是否可以应用广播哈希连接。
3条答案
按热度按时间zsbz8rwp1#
您可以使用
broadcast
函数显式地将DataFrame
标记为足够小以进行广播:Python:
字符串
或广播提示(Spark >= 2.2):
型
Scala:
型
或广播提示(Spark >= 2.2):
型
SQL
您可以使用提示(Spark >= 2.2):
型
或
型
或
型
R(SparkR):
对于
hint
(Spark >= 2.2):型
使用
broadcast
(Spark >= 2.3)型
备注:
如果其中一个结构相对较小,则广播连接非常有用。否则,它的开销可能比完全 Shuffle 大得多。
mum43rcc2#
字符串
shuffledHashJoin:
印度的所有数据将被混洗到每个州的29个键中。2问题:不均匀的分片。3有限的并行性与29个输出分区。
broadcasteHashJoin:
将小RDD广播到所有工作节点。大RDD的并行性仍然保持,甚至不需要shuffle。
PS:图片可能很丑,但信息丰富。
umuewwlo3#
对于广播连接,连接等式的一边被具体化并发送给所有Map器,因此它被认为是Map端连接。
当数据集被具体化并通过网络发送时,它只会带来显著的性能改进,如果它相当小的话。
因此,如果您尝试执行smallDF.join(largeDF)
等等!!!另一个限制是它还需要完全适合每个执行器的内存。它还需要适合驱动程序的内存!**
广播变量使用Torrent协议(即对等协议)在执行器之间共享,Torrent协议的优点是对等体彼此共享文件的块,而不依赖于持有所有块的中央实体。
上面提到的例子足以开始播放广播加入。
注意: 创建后不能修改值,如果尝试修改,只会在一个节点(&node)上修改*