pyspark 日期范围间隔的左联接

4jb9z9bj  于 2023-03-01  发布在  Spark
关注(0)|答案(3)|浏览(218)

我有两个 Dataframe
DF1
| 用户标识|日期1|
| - ------|- ------|
| 1个|2023年1月1日|
| 第二章|2020年2月15日|
| 三个|二○二二年三月二日|
和DF2
| 用户标识|入组日期|退出日期|
| - ------|- ------|- ------|
| 1个|2018年6月1日|零|
| 1个|二○ ○八年一月一日|2012年1月1日|
| 第二章|2010年2月2日|2020年2月13日|
| 三个|2011年1月1日|零|
我想要以下内容
DF1
| 用户标识|日期1|标签|
| - ------|- ------|- ------|
| 1个|2023年1月1日|真的|
| 第二章|2020年1月15日|假|
| 三个|二○二二年三月二日|真的|
我尝试了以下方法,但没有效果:df1 = df1.join(df2,"用户标识","左"). withColumn("标签",F.当(F.列("日期1")〉= F.列("入口日期")& F.列("日期1")〈= F.列("存在日期")|F.列("日期1")〉= F.列("入口日期")& F.列("存在日期").为空),亮起("True")).否则(亮起("False"))
如果user_id的日期1介于entrance_date和exist_date之间,则构造标记列

dwbf0jvd

dwbf0jvd1#

你需要
1.将列名包含在F.col('<colname>')
1.简化条件语句(if-else子句)
1.使用F.isnull()代替is null
下面的代码未经测试,但应该可以完成

import pyspark.sql.functions as F

df1 = df1 \
    .withColumn(
        'tag',
        F.when(
            F.col('Date 1') >= F.col('entrance_date'),
            F.when(
                (F.col('Date 1') <= F.col('exit_date')) | (F.isanull(F.col('exit_date'))),
                F.lit("True")
            ).otherwise(F.lit("False"))
        ).otherwise(F.lit("False"))
    )
hmmo2u0o

hmmo2u0o2#

这是可行的:

df1.alias("df1").join(df2.alias("df2"), [F.col("df1.user_id")==F.col("df2.user_id")])\
.withColumn("Tag", F.when(F.col("df1.date_1").between(F.col("df2.entrance_date"), F.coalesce(F.col("df2.exit_date"), F.lit("2099-03-02").cast("date"))), "true").otherwise("false"))\
.groupBy("df1.user_id")\
.agg(F.first("date_1").alias("date_1"), F.sum(F.when(F.col("tag") == "true", 1).otherwise(0)).cast("boolean").alias("tag"))\
.show()

输入:
东风1-东风2

输出:

bnlyeluc

bnlyeluc3#

@sud的答案很好。我个人更喜欢将NULL end_date列设置为'9999-12- 31'之类的有用值,假设这意味着一个'open'结束日期。这有助于降低复杂性,特别是当需要执行多个操作时。

# fill the end_date NULLs
df_2 = df_2.withColumn('exit_date', 
  F.coalesce('exit_date', F.to_timestamp(F.lit('9999-12-31')))
)

df = df_1.join(df_2, on='user_id', how='inner')

# additional OR condition's not required
df.withColumn('tag',
  F.when(F.col('date_1').between(
    F.col('entrance_date'), F.col('exit_date')
  ), F.lit(True)).otherwise(F.lit(False))
).show()

相关问题