使用apachespark连接两个大表

wfauudbj  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(326)

我想使用spark通过特定的互键连接两个非常大的表,我试着理解这样做的最佳方式是什么。
比如说:
表1包含900m行和~100列
表2包含600m行和~200列。
我们不能使用“广播连接”,表很大,不能广播。
我想使用在两个表中都存在的互“id”列联接(内部联接)表,另外,我知道id列在两个表中包含相同的值,其中一个表中没有id值,但另一个表中不存在id值。
我能想到的理想方法是将我的每个表“划分”为包含相同“id”值的分区/存储桶,并将它们发送给相同的执行器,该执行器将以集群中最小的数据洗牌来计算联接结果。
我的问题是:
如果我对每个表使用例如.repartition(5,'id'),那么5个分区中的每个分区都将包含相同的'id'值(只要我们在这两个中有相同的'id'值)
例如:

df1
+---+---+------+
|age| id|  name|
+---+---+------+
|  5|  1| David|
| 50|  2|  Lily|
| 10|  3|   Dan|
| 15|  4|Nicole|
| 16|  5|  Dana|
| 19|  6|   Ron|
| 20|  7| Alice|
| 22|  8|  Nora|
| 45|  9|  Sara|
| 70| 10| Aaron|
+---+---+------+

df2
+---+-----+
| id|price|
+---+-----+
|  1| 30.8|
|  1| 40.3|
|  2|100.0|
|  2| 30.1|
|  3| 99.0|
|  3|102.0|
|  4| 81.2|
|  4| 91.2|
|  5| 73.4|
|  6| 22.2|
|  7|374.4|
|  8|669.7|
|  9|  4.8|
| 10|35.38|
+---+-----+

df1.repartition(5,'id')
df2.repartition(5,'id')

如果df1分区是:[id=1,id=2],[id=3,id=4],[id=5,id=6],[id=7,id=8],[id=9,id=10]
df2是否也必须如此?
如果我以相同的方式使用'bucketby',我会在表的bucket中得到相同的'id'值吗?
spark会将正确的分区发送给同一个执行器吗?我的意思是,包含表1的[id=1,id=2]的分区和包含表2的[id=1,id=2]的分区将被发送到连接的同一个执行器。
如果我遗漏了什么,或者您可以推荐另一种方法,在我提到的假设下连接两个大表,这将非常有用。

envsm3lx

envsm3lx1#

是的,它必须是这样的,否则整个加入的范例就不可靠了。
你的意思是真正的工人-有执行者的机器。
单独重新分配不适合循环使用。
范围分区也起作用。检查以确定,但假设分区值的分布与但书相同。
这一切都是在懒惰评估的前提下进行的。
bucketby可以使用,但更多的是用于持久化到磁盘并在下一个应用程序中使用。
同样,您不必担心协助,因为lazy eval意味着优化器有机会解决所有问题—分配给哪个工作者。但这是在一个较低层次的细节,抽象。

vlf7wbxs

vlf7wbxs2#

看看这个答案。
tldr:如果你想加入它们一次,并且这是重新分区的唯一目的,那么只需加入它们。

相关问题