Pyspark正向填充非空数据

rsaldnfx  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(144)

我目前有一个 Dataframe ,其中显示了用户id、日期和y_n列,如下所示:
| 用户标识|日期|是(_N)|
| - -|- -|- -|
| 一个|2022年6月1日|不|
| 一个|2022年6月5日|Y型|
| 一个|2022年6月20日|不|
| 2个|2022年6月1日|不|
| 2个|2022年6月2日|不|
| 2个|2022年7月7日|Y型|
| 2个|2022年7月7日|不|
| 2个|2022年7月8日|不|
我尝试对任何给定的用户,如果他们在任何之前的日期有Y,我已经在下面添加了一列,any_previous_y,它显示了预期的结果。
| 用户标识|日期|是(_N)|任何上一个|
| - -|- -|- -|- -|
| 一个|2022年6月1日|不|不|
| 一个|2022年6月5日|Y型|不|
| 一个|2022年6月20日|不|Y型|
| 2个|2022年6月1日|不|不|
| 2个|2022年6月2日|不|不|
| 2个|2022年7月7日|Y型|不|
| 2个|2022年7月7日|不|不|
| 2个|2022年7月8日|不|Y型|
在这里,您可以看到user_id 1在2022-06-05的值为Y,因此其2022-06-20记录的any_previous_y值为Y。请注意,user_id 2在2022-07-07有两个记录,我希望这两个记录的any_previous_y值都为N,因为在这一天之前,它们的值都不为Y。
我一直在尝试使用partitionBy()派生此列,但到目前为止,我还无法获得所需的内容。

6za6bjd0

6za6bjd01#

您可以使用lag并按datey_n字段排序。由于在排序时,"Y"会出现在"N"之后,因此即使在同一日期有2个值,lag也会提供"Y"

import pyspark.sql.functions as func
from pyspark.sql.window import Window as wd

data_sdf. \
    withColumn('any_prev_y', 
               func.coalesce(func.lag('yn').over(wd.partitionBy('user_id').orderBy('dt', 'yn')),
                             func.lit('N')
                             )
               ). \
    show()

# +-------+----------+---+----------+

# |user_id|        dt| yn|any_prev_y|

# +-------+----------+---+----------+

# |      1|2022-06-01|  N|         N|

# |      1|2022-06-05|  Y|         N|

# |      1|2022-06-20|  N|         Y|

# |      2|2022-06-01|  N|         N|

# |      2|2022-06-02|  N|         N|

# |      2|2022-07-07|  N|         N|

# |      2|2022-07-07|  Y|         N|

# |      2|2022-07-08|  N|         Y|

# +-------+----------+---+----------+

另一种方法是使用数组和array_contains。您将同一日期的所有Y/N收集到一个数组中,并检查该数组中的前一个日期是否有"Y"

data_sdf. \
    groupBy('user_id', 'dt'). \
    agg(func.collect_list('yn').alias('yn_arr')). \
    withColumn('any_prev_y', 
               func.when(func.array_contains(func.lag('yn_arr').over(wd.partitionBy('user_id').orderBy('dt')), func.lit('Y')), func.lit('Y')).
               otherwise(func.lit('N'))
               ). \
    orderBy('user_id', 'dt'). \
    selectExpr('user_id', 'dt', 'explode(yn_arr) as yn', 'any_prev_y'). \
    show()

# +-------+----------+---+----------+

# |user_id|        dt| yn|any_prev_y|

# +-------+----------+---+----------+

# |      1|2022-06-01|  N|         N|

# |      1|2022-06-05|  Y|         N|

# |      1|2022-06-20|  N|         Y|

# |      2|2022-06-01|  N|         N|

# |      2|2022-06-02|  N|         N|

# |      2|2022-07-07|  Y|         N|

# |      2|2022-07-07|  N|         N|

# |      2|2022-07-08|  N|         Y|

# +-------+----------+---+----------+

相关问题