sparksql中交叉连接的优化

nwlls2ji  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(465)

有没有可能在sparksql中优化交叉连接?要求填充一列 band_id 基于另一个表中定义的年龄范围。到目前为止,我已经能够通过 Cross Join 以及 WHERE 条款。但是,我希望是否有更好的方法来编写代码并缓解性能问题。我能用广播提示吗(sql(以下提供)
客户:(10米记录)

id | name | age
X1 | John | 22
V2 | Mark | 29
F4 | Peter| 42

年龄带表:(10条记录)

band_id | low_age | high_age
B123    |  10     | 19
X745    |  20     | 29
P134    |  30     | 39
Q245    |  40     | 50

预期产量:

id | name | age | band_id
X1 | John | 22  | X745
V2 | Mark | 29  | X745
F4 | Peter| 42  | Q245

查询:

select
from cust a
cross join age_band b
where a.age between b.low_age and b.high_age;

请告知。

h9vpoimq

h9vpoimq1#

SparkStrategies.scala 在你的情况下,似乎你可以,但你不必指定 cross 或者 broadcast 提示,因为spark无论如何都会选择广播嵌套循环联接:


* ...

   * - Broadcast nested loop join (BNLJ):
   *     Supports both equi-joins and non-equi-joins.
   *     Supports all the join types, but the implementation is optimized for:
   *       1) broadcasting the left side in a right outer join;
   *       2) broadcasting the right side in a left outer, left semi, left anti or existence join;
   *       3) broadcasting either side in an inner-like join.
   *     For other cases, we need to scan the data multiple times, which can be rather slow. 
   * ...
9rbhqvlz

9rbhqvlz2#

您不需要使用 cross join 但是 left join 够了。当我同时执行这两种操作时,查询执行的物理计划略有不同。我更喜欢用后者。

val df3 = spark.sql("""
    SELECT 
        id, name, age, band_id
    FROM 
        cust a
    CROSS JOIN 
        age_band b
    ON 
        age BETWEEN low_age and high_age
""")

df3.explain

== Physical Plan ==

* (3) Project [id#75, name#76, age#77, band_id#97]

+- BroadcastNestedLoopJoin BuildLeft, Cross, ((age#77 >= low_age#98) AND (age#77 <= high_age#99))
   :- BroadcastExchange IdentityBroadcastMode, [id=#157]
   :  +- *(1) Project [id#75, name#76, age#77]
   :     +- *(1) Filter isnotnull(age#77)
   :        +- FileScan csv [id#75,name#76,age#77] Batched: false, DataFilters: [isnotnull(age#77)], Format: CSV, Location: InMemoryFileIndex[file:/test1.csv], PartitionFilters: [], PushedFilters: [IsNotNull(age)], ReadSchema: struct<id:string,name:string,age:int>
   +- *(2) Project [band_id#97, low_age#98, high_age#99]
      +- *(2) Filter (isnotnull(low_age#98) AND isnotnull(high_age#99))
         +- FileScan csv [band_id#97,low_age#98,high_age#99] Batched: false, DataFilters: [isnotnull(low_age#98), isnotnull(high_age#99)], Format: CSV, Location: InMemoryFileIndex[file:/test2.csv], PartitionFilters: [], PushedFilters: [IsNotNull(low_age), IsNotNull(high_age)], ReadSchema: struct<band_id:string,low_age:int,high_age:int>

val df4 = spark.sql("""
    SELECT  /*+ BROADCAST(age_band) */ 
        id, name, age, band_id
    FROM 
        cust a
    LEFT JOIN 
        age_band b
    ON 
        age BETWEEN low_age and high_age
""")

df4.explain

== Physical Plan ==

* (2) Project [id#75, name#76, age#77, band_id#97]

+- BroadcastNestedLoopJoin BuildRight, LeftOuter, ((age#77 >= low_age#98) AND (age#77 <= high_age#99))
   :- FileScan csv [id#75,name#76,age#77] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/test1.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string,age:int>
   +- BroadcastExchange IdentityBroadcastMode, [id=#192]
      +- *(1) Project [band_id#97, low_age#98, high_age#99]
         +- *(1) Filter (isnotnull(low_age#98) AND isnotnull(high_age#99))
            +- FileScan csv [band_id#97,low_age#98,high_age#99] Batched: false, DataFilters: [isnotnull(low_age#98), isnotnull(high_age#99)], Format: CSV, Location: InMemoryFileIndex[file:/test2.csv], PartitionFilters: [], PushedFilters: [IsNotNull(low_age), IsNotNull(high_age)], ReadSchema: struct<band_id:string,low_age:int,high_age:int>

相关问题