使用spark sql查询按使用情况进行群集

1qczuiv0  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(301)

我最近被介绍到sparksql。我在什么地方读到关于使用 CLUSTER BYjoin 要改进的列(在联接之前) join 性能。例子:

create temporary view prod as
select id, name
from product
cluster by id; 

create temporary view cust as
select cid, pid, cname
from customer
cluster by pid;

select c.id, p.name, c.name 
from prod p
join cust c
on p.id = c.pid;

有谁能解释一下在哪些情况下应该使用相同的方法吗?我知道对于join,数据是无序的。那有什么好处呢 CLUSTER BY 引入,因为它也会洗牌数据?
谢谢。

gt0wga4j

gt0wga4j1#

spark将识别集群并洗牌数据。但是,如果您在后面的查询中使用相同的列,从而导致混乱,spark可能会重新使用交换。

yr9zkbsy

yr9zkbsy2#

如果您使用sql接口,就可以不用使用df接口就可以完成任务。 Cluster By 同:

df.repartition($"key", n).sortWithinPartitions()

由于延迟求值,spark将看到连接,并知道您表示希望通过键重新分区-通过sql,而不是像上面的语句一样-因此它只是一个相当于相同内容的接口。使只停留在sql模式更容易。你可以混合。
如果您不这样做,那么spark将为您做(通常),并应用当前的shuffle partitions参数。

SET spark.sql.shuffle.partitions = 2
SELECT * FROM df CLUSTER BY key

同:

df.repartition($"key", 2).sortWithinPartitions()
spark.sql('''SELECT /*+ REPARTITION(col,..) */ cols... from table''')

更新
以下情况不适用于联接:

val df = spark.sql(""" SELECT /*+ REPARTITION(30, c1) */ T1.c1, T1.c2, T2.c3
                         FROM T1, T2   
                        WHERE T1.c1 = T2.c1
                   """)

这样做的目的是在处理连接后重新分区。join将使用t1和t2上设置的较高的分区num,如果没有显式设置,则使用shuffle分区。

相关问题