复杂联接(Pyspark)-范围和分类

watbbzwu  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(165)

表1:
| Perl诺|版本|盖|rf_1| rf_2| rf_3| rl_1| rl_2| rl_3|
| --|--|--|--|--|--|--|--|--|
| ABC123| 1 |一|[“base”,“usage”]|[“group”]| null| [“500”,“private”]|[“蓝色”]| null|
| cde111| 1 |一|[“base”,“usage”]|[“age”]|[“protection”,“gold_claim”,”more_more_one_claim”]|[“500”,“private”]|[“9”]|[“Y”,“Y”,“N”]|
| cde222| 1 |一|[“base”,“usage”]|[“group”]| null| [“300”,“business”]|[“gold”]| null|
表2:
| 评级因子1|评级因子2|评级因子3|额定系数a|额定系数cat_B|额定系数c|额定系数起动|额定因数端|额定因数amt|额定因数系数|额定因数率|版本|盖|
| --|--|--|--|--|--|--|--|--|--|--|--|--|
| 基地|使用|null|私人|null| null| 400 | 550 | 50 |0.2| null| 1 |一|
| 基地|使用|null|业务|null| null| 200 | 300 | 70 |0.4| null| 1 |一|
| 组|null| null|蓝色|null| null| null| null| 20 |null| 0.5| 1 |一|
| 组|null| null|黄金|null| null| null| null| 30 |null| 0.8| 1 |一|
| 保护|金|多项索赔|Y| Y| N| null| null| 10 |0.4| null| 1 |一|
| 年龄|null| null| null| null| null| 0 | 5 | 15 |0.5| null| 1 |一|
| 年龄|null| null| null| null| null| 6 | 10 | 20 |0.8| null| 1 |一|
我必须将表1连接到表2。问题是,既有范围连接,也有分类连接。我将按版本和封面连接它们。然后,例如表1中的rf_1将与表2中的rating _ factor 1和rating _ factor 2匹配,并查看rl_1中的值,我必须将使用范围与表2连接,并将第二个因子“private”与rating _ factor _ cat _a连接。并将amount,coeff和rate值追加回表1。
输出表:
| Perl诺|版本|盖|rf_1| rf_2| rf_3| rl_1| rl_2| rl_3| amt_1|系数_1|率_1| amt_2|系数2|率_2| amt_3|系数_3|率_3|
| --|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|
| ABC123| 1 |一|[“base”,“usage”]|[“group”]| null| [“500”,“private”]|[“蓝色”]| null| 50 |0.2| null| 20 |null| 0.5| null| null| null|
| cde111| 1 |一|[“base”,“usage”]|[“age”]|[“protection”,“gold_claim”,”more_more_one_claim”]|[“500”,“private”]|[“9”]|[“Y”,“Y”,“N”]| 50 |0.2| null| 20 |0.8| null| 10 |0.4| null|
| cde222| 1 |一|[“base”,“usage”]|[“group”]| null| [“300”,“business”]|[“gold”]| null| 70 |0.4| null| 30 |null| 0.8| null| null| null|
我认为我们必须对各种连接进行排列,例如[category,range],[range,category],[category,category]和[range,range],看看哪些连接在最终的表中给出了至少一个值amount,coeff,rate。
我不能使用硬编码来获取数量,coeff和速率值,必须使用循环,因为在实际情况下有10000行这样的数据。

bvuwiixz

bvuwiixz1#

你所要求的基于范围的连接,同时包括所有rf因子的类别,是不可能通过连接操作实现的,但是过滤器和连接的组合可以实现你的输出。
这是它的查询。
首先创建这两个表的临时视图。

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

字符串
然后在满足所有rf因子的条件上创建查询。

querycon=''
for i in range(1,4):
    rf = f'rf_{i}'
    rl= f'rl_{i}'
    q = f''',case
when ((d1.{rf} is not null) and (tab2_cat_values==array()) and ((cast(d1.{rl}[0] as int) >= d2.rating_factor_start) and (cast(d1.{rl}[0] as int) <= d2.rating_factor_end))) then struct(d2.rating_factor_amt,d2.rating_factor_coeff,d2.rating_factor_rate)
when ((d1.{rf} is not null) and (tab2_cat_values!=array()) and (array_size(d1.{rl})==array_size(tab2_cat_values) and (d1.{rl}==tab2_cat_values))) then struct(d2.rating_factor_amt,d2.rating_factor_coeff,d2.rating_factor_rate)
when ((d1.{rf} is not null) and (tab2_cat_values!=array()) and (array_size(d1.{rl})>1) and (array_contains(d1.{rl},tab2_cat_values[0])) and ((cast(d1.{rl}[0] as int) >= d2.rating_factor_start) and (cast(d1.{rl}[0] as int) <= d2.rating_factor_end))) then 
struct(d2.rating_factor_amt,d2.rating_factor_coeff,d2.rating_factor_rate)
else null end as jtmp_{i},
jtmp_{i}.rating_factor_amt as amt_{i},
jtmp_{i}.rating_factor_coeff as coeff_{i},
jtmp_{i}.rating_factor_rate as rate_{i}
'''
    querycon=querycon+q


在这里,我循环通过所有的因素在你的情况下,它的3,如果更多的因素来在未来,你需要修改上述情况相应。
在这种情况下,声明
1.它检查类别,如果它是空的,那么它会检查范围并获取输出。
1.然后检查两个表中的类别是否相同
1.最后,如果有更多的类别,然后去检查类别和范围条件。
此查询与如下所示的主连接数据相结合。

fquery=f'''select
d1.pol_no,d1.version,d1.cover,d1.rf_1,d1.rl_1,d1.rf_2,d1.rf_3,d1.rl_2,d1.rl_3,
(array_compact(array(d2.rating_factor_cat_a,d2.rating_factor_cat_b,d2.rating_factor_cat_c))) as tab2_cat_values
{querycon}
from df1 d1 
join df2 d2 on d1.version=d2.version 
and d1.cover=d2.cover 
and  (
    (array_compact(array(d2.rating_factor_1,d2.rating_factor_2,d2.rating_factor_3))==d1.rf_1)
    or (array_compact(array(d2.rating_factor_1,d2.rating_factor_2,d2.rating_factor_3))==d1.rf_2)
    or (array_compact(array(d2.rating_factor_1,d2.rating_factor_2,d2.rating_factor_3))==d1.rf_3))
'''

df = spark.sql(fquery)


接下来选择所需的列。

from pyspark.sql.functions import collect_set,explode_outer,col

result_cols=["amt_1","coeff_1","rate_1","amt_2","coeff_2","rate_2","amt_3","coeff_3","rate_3"]
aggexpr = [collect_set(i).alias(i) for i in result_cols]
df = df.groupBy(*df.columns[:9]).agg(*aggexpr)

for i in result_cols:
    df = df.withColumn(i,explode_outer(col(i)))
display(df)


这里,*df.columns[:9]是从表1到rl_3的列。如果将来有更多的因子,需要扩展列表直到它。
输出量:
| Perl诺|版本|盖|rf_1| rl_1| rf_2| rf_3| rl_2| rl_3| amt_1|系数_1|率_1| amt_2|系数2|率_2| amt_3|系数_3|率_3|
| --|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|--|
| ABC123| 1 |一|[“base”,“usage”]|[“500”,“private”]|[“group”]| null| [“蓝色”]| null| 50 |0.2| null| 20 |0.5| null| null| null| null|
| cde111| 1 |一|[“base”,“usage”]|[“500”,“private”]|[“age”]|[“protection”,“gold_claim”,“more_more_one_claim”]|[“9”]|[“Y”,“Y”,“N”]| 50 |0.2| null| 20 |0.8| null| 10 |0.4| null|
| cde222| 1 |一|[“base”,“usage”]|[“300”,“business”]|[“group”]| null| [“gold”]| null| 70 |0.4| null| 30 |0.8| null| null| null| null|
x1c 0d1x的数据

相关问题