我试图理解spark(2.4)物理计划。我们通过sqlapi与spark交互。
我正在使用下面的sql。sql在步骤1中有一个聚合,在下一步中有一个连接操作。我的意图是 repartition
步骤1之前的源表,以便我可以重用它 Exchange
避免 Shuffles
( Exchanges
)在下面的步骤,但它没有按照我的预期工作。你能帮我理解我哪里做错了吗。
create or replace temporary view prsn_dtl as
select
policy_num,
prsn_id,
eff_dt,
from db.person_details
cluster by policy_num;
create or replace temporary view plcy_dtl as
select
policy_num,
role_desc,
prsn_actv_flg
from plcy_detail
create or replace temporary view my_keys as
select
policy_num as policy_num,
prsn_id as prsn_id,
max(eff_dt) as eff_dt
from prsn_dtl
group by 1, 2;
select
keys.policy_num,
keys.prsn_id,
keys.eff_dt,
plcy.role_desc,
plcy.prsn_actv_flg
from my_keys keys
inner join plcy_dtl plcy
on keys.policy_num = plcy.policy_num;
在dag表示中,我找到了3个 Exchanges
-
步骤1)第一步 hashpartitioning(policy_num#92, 200)
由于 cluster by
在 aggregate
第二步是 Aggregate
上的运算符 hashpartitioning(policy_num#163, prsn_id#164, 200)
步骤3)最后 hashpartitioning(policy_num#163)
排序合并联接之前
我的问题是:
为什么没有呢 Exchange
(来自 cluster by
)从上面的步骤1开始,get向下传播,在sort merge join之前的步骤3中没有被重用。
我的期望是spark将重用 Exchange
从步骤1开始( cluster by
)不会再加一个 Exchange
(在smj之前)在步骤3中,因为两者都是 repartitioned
关于政策。
谁能解释一下我哪里出了问题吗。
更新:物理plan:-
CollectLimit 1
+- *(6) Project [policy_num#836, prsn_id#837, eff_dt#838, role_desc#304, prsn_actv_flg#306]
+- *(6) SortMergeJoin [policy_num#836], [policy_num#300], Inner
: +- *(3) Sort [policy_num#836 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(policy_num#836, 200)
: +- *(2) HashAggregate(keys=policy_num#801, prsn_id#802], functions=[max(eff_dt)], output=[policy_num#836, prsn_id#837, eff_dt#838])
: +- *(2) HashAggregate(keys=[policy_num#801, prsn_id#802], functions=[partial_max(eff_dt#803)], output=[policy#801, prsn_id#802, max#847])
: +- *(2) Sort [policy_num#801 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(policy_num#801, 200)
: +- *(1) Project [policy_num#801, prsn_id#802, eff_dt#803]
: +- *(1) Filter isnotnull(policy_num#801)
: +- *(1) Filescan parquet testdb.prsn_details[policy_num#801,prsn_id#802,eff_dt#803] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3://test_db.prsn_details/20200505..., PartitionFilters: [], PushFilters: [IsNotNull(policy_num)], ReadSchema: struct<policy_num:string, prsn_id:string, eff_dt:date>
+- *(5) Sort [policy_num#300 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(policy_num#300, 200)
+- *(4) Project [policy_num#300, role_desc#304, prsn_actv_flg#306]
+- *(4) Filter (((trim(prsn_actv_flg#306, None) = ) ................
+- *(4) Filescan parquet
plcy_detail[policy_num#300,role_desc#304,prsn_actv_flg#306] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[s3://test_db.plcy_details/20200505..., PartitionFilters: [], PushFilters: [IsNotNull(policy_num)], ReadSchema: struct<policy_num:string, role_desc:string, prsn_actv_flg:string>
更新二:(解决方案):-删除了列重命名( AS
)从 GROUP BY
查询,现在exchange正在查询计划中重新使用:
create or replace temporary view my_keys as
select
policy_num,
prsn_id,
max(eff_dt) as eff_dt
from prsn_dtl
group by 1, 2;
谢谢
2条答案
按热度按时间gc0ot86w1#
因为您是按policy\u num和prsn\u id分组的,所以您将进行一次新的洗牌,然后再次加入。在这种情况下,不需要初始cluster by。您必须至少洗牌2次:1)在我的\键中按分组,2)加入。
pgpifvop2#
您指定的查询与您在这里提到的完全相同吗?或者这是一个更大问题的一部分?spark将重新使用
exchange
如果aggregate
密钥是重新分区密钥的子集。问题可能是因为在查询之间重命名了列。如果您的查询有别名,您可能需要删除它们,然后再次检查。