pyspark 子查询中带比较的Spark sql左连接

4uqofj5v  于 2023-02-18  发布在  Spark
关注(0)|答案(1)|浏览(182)

我有以下2个 Dataframe :
df_a:
| 身份证|日期|代码|
| - ------|- ------|- ------|
| 1个|二〇二一年六月二十七日|A类|
DF_B:
| 身份证|日期|代码|
| - ------|- ------|- ------|
| 1个|二〇二一年五月十九日|A类|
| 1个|二○二一年五月三十一日|乙|
| 1个|二〇二一年八月二十七日|C级|
我想使用df_b. code更新df_a. code,条件如下:
使用df_b中的行,其中www.example.com是df_a.日期之前的最新版本。b.date is latest prior to the df_a.date.
由于df_b.日期"2021 - 05 - 31"是"2021 - 06 - 27"之前的最新日期,因此df_a.代码将更新为"B"
我试过:

select a.id, b.code
from df_a left join df_b
on a.id = b.id
and b.date = (select max(b.date) from df_b where id = a.id and date <= a.date)

但是我得到了"相关的标量子查询只能在过滤器/聚合/项目和一些命令中使用"的错误

jexiocij

jexiocij1#

可以使用窗口函数:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

win = Window.partitionBy(df_a.id).orderBy(df_b.date.desc())
(
    df_a
    .join(df_b,['id'])
    .filter(df_a.date > df_b.date)
    .withColumn("r", F.row_number().over(win))
    .filter(F.col("r")==1)
    .select(df_a.id, df_a.date, df_b.code)
).show()

输出:

| id|      date|code|
+---+----------+----+
|  1|2021-06-27|   B|
+---+----------+----+

相关问题