spark物理计划中的重新划分和洗牌

cwtwac6a  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(708)

我试图理解spark(2.4)物理计划。我们通过sqlapi与spark交互。
我正在使用下面的sql。sql在步骤1中有一个聚合,在下一步中有一个连接操作。我的意图是 repartition 步骤1之前的源表,以便我可以重用它 Exchange 避免 Shuffles ( Exchanges )在下面的步骤,但它没有按照我的预期工作。你能帮我理解我哪里做错了吗。

create or replace temporary view prsn_dtl as
    select
    policy_num,
    prsn_id,
    eff_dt,
    from db.person_details
    cluster by policy_num;

    create or replace temporary view plcy_dtl as
    select
    policy_num,
    role_desc,
    prsn_actv_flg
    from plcy_detail

    create or replace temporary view my_keys as
    select
    policy_num as policy_num,
    prsn_id as prsn_id,
    max(eff_dt) as eff_dt
    from prsn_dtl
    group by 1, 2;

    select
    keys.policy_num,
    keys.prsn_id,
    keys.eff_dt,
    plcy.role_desc,
    plcy.prsn_actv_flg
    from my_keys keys
    inner join plcy_dtl plcy
    on keys.policy_num = plcy.policy_num;

在dag表示中,我找到了3个 Exchanges -
步骤1)第一步 hashpartitioning(policy_num#92, 200) 由于 cluster byaggregate 第二步是 Aggregate 上的运算符 hashpartitioning(policy_num#163, prsn_id#164, 200) 步骤3)最后 hashpartitioning(policy_num#163) 排序合并联接之前
我的问题是:
为什么没有呢 Exchange (来自 cluster by )从上面的步骤1开始,get向下传播,在sort merge join之前的步骤3中没有被重用。
我的期望是spark将重用 Exchange 从步骤1开始( cluster by )不会再加一个 Exchange (在smj之前)在步骤3中,因为两者都是 repartitioned 关于政策。
谁能解释一下我哪里出了问题吗。
更新:物理plan:-

CollectLimit 1
        +- *(6) Project [policy_num#836, prsn_id#837, eff_dt#838, role_desc#304, prsn_actv_flg#306]
          +- *(6) SortMergeJoin [policy_num#836], [policy_num#300], Inner 
    :       +- *(3) Sort [policy_num#836 ASC NULLS FIRST], false, 0
    :        +- Exchange hashpartitioning(policy_num#836, 200)
    :         +- *(2) HashAggregate(keys=policy_num#801, prsn_id#802], functions=[max(eff_dt)], output=[policy_num#836, prsn_id#837, eff_dt#838])
    :           +- *(2) HashAggregate(keys=[policy_num#801, prsn_id#802], functions=[partial_max(eff_dt#803)], output=[policy#801, prsn_id#802, max#847]) 
    :             +- *(2) Sort [policy_num#801 ASC NULLS FIRST], false, 0 
    :               +- Exchange hashpartitioning(policy_num#801, 200)
    :                +- *(1) Project [policy_num#801, prsn_id#802, eff_dt#803]
    :                  +- *(1) Filter isnotnull(policy_num#801)     
    :                   +-   *(1) Filescan parquet testdb.prsn_details[policy_num#801,prsn_id#802,eff_dt#803] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3://test_db.prsn_details/20200505..., PartitionFilters: [], PushFilters: [IsNotNull(policy_num)], ReadSchema: struct<policy_num:string, prsn_id:string, eff_dt:date>              
               +- *(5) Sort [policy_num#300 ASC NULLS FIRST], false, 0  
                +- Exchange hashpartitioning(policy_num#300, 200)
                  +- *(4) Project [policy_num#300, role_desc#304, prsn_actv_flg#306]
                   +- *(4) Filter (((trim(prsn_actv_flg#306, None) = ) ................
                     +- *(4) Filescan parquet
plcy_detail[policy_num#300,role_desc#304,prsn_actv_flg#306] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[s3://test_db.plcy_details/20200505..., PartitionFilters: [], PushFilters: [IsNotNull(policy_num)], ReadSchema: struct<policy_num:string, role_desc:string, prsn_actv_flg:string>

更新二:(解决方案):-删除了列重命名( AS )从 GROUP BY 查询,现在exchange正在查询计划中重新使用:

create or replace temporary view my_keys as
select
policy_num,
prsn_id,
max(eff_dt) as eff_dt
from prsn_dtl
group by 1, 2;

谢谢

gc0ot86w

gc0ot86w1#

因为您是按policy\u num和prsn\u id分组的,所以您将进行一次新的洗牌,然后再次加入。在这种情况下,不需要初始cluster by。您必须至少洗牌2次:1)在我的\键中按分组,2)加入。

pgpifvop

pgpifvop2#

您指定的查询与您在这里提到的完全相同吗?或者这是一个更大问题的一部分?spark将重新使用 exchange 如果 aggregate 密钥是重新分区密钥的子集。问题可能是因为在查询之间重命名了列。如果您的查询有别名,您可能需要删除它们,然后再次检查。

相关问题