仅保留基于具有垂直条件PySpark的列的更新日期

2fjabf4q  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(151)

对于每个ID,我希望根据某些条件仅选择基于“A”列的更新记录。
此用例只是一个示例。以下几点通过添加以下条件,使垂直处理数据的练习变得复杂:
1 -我想跟踪列“A”上的所有更改,只保留该值的第一次出现
2 -我想保留 NULL
3 -“A”可以高于、低于或等于前一个值
4 -我没有 dt_run 时间戳。因此,我想假设如果在同一天内我有不同的“A”值,这是由 dt_run 不可排序引起的。如果这一天包含等于前一天和后一天的值,我们可以删除它们(假设我们可以直接转到下一次更新[示例中的粗体])。如果这一天包含不同的值,则将其指定为该值[示例中的斜体]。
5 -我的预期输出是仅在一行中包含 ID-dt_run
Desidered output
| 识别码|A级|dt_运行|
| - -|- -|- -|
| 一个|四十五|2022年2月11日|
|1| 72岁|2022年2月13日|
|1| 四十五岁|2022年2月13日|
|1| 72岁|2022年2月13日|
| 一个|七十二人|2022年2月15日|
| 一个|四十五|2022年2月16日|
| 2个|八十八个|2022年2月16日|
| 2个|八十八个|2022年2月16日|
| * 二 *| 八十八|2022年2月17日|
| * 二 *| 七十七岁|2022年2月17日|
| * 二 | 空值 *| 2022年2月17日|
| 2个|空值|2022年2月18日|
| 2个|九十二|2022年2月19日|
Desidered output
| 识别码|A级|dt_运行|
| - -|- -|- -|
| 一个|四十五|2022年2月11日|
| 一个|七十二人|2022年2月15日|
| 一个|四十五|2022年2月16日|
| 2个|八十八个|2022年2月16日|
| 2个|七十七个|2022年2月17日|
| 2个|空值|2022年2月18日|
| 2个|九十二|2022年2月19日|
我试着使用了几个过于复杂的窗口函数。有没有简单的方法来解决这些垂直问题?
谢谢你的帮助!

tvz2xvvm

tvz2xvvm1#

可以使用窗口函数实现。请参见下面的逻辑和代码

W=Window.partitionBy('ID').orderBy('dt_run')
new = (df.withColumn('x',row_number().over(W))#Create Row number for each ID
       .withColumn('y',max('x').over(Window.partitionBy('A','ID')))#Find maximum index in a combination of A and ID
       .where((col('x')==col('y'))|(col('x')==1))#Filter where index and maximum index are equal or the index is the first
       .orderBy('ID','dt_run')#reorder the frame
       .drop('x','y')#drop unwanted columns
      ).show(truncate=False)

+---+----+----------+
|ID |A   |dt_run    |
+---+----+----------+
|1  |45  |2022-02-11|
|1  |72  |2022-02-15|
|1  |45  |2022-02-16|
|2  |88  |2022-02-16|
|2  |77  |2022-02-17|
|2  |88  |2022-02-17|
|2  |null|2022-02-18|
|2  |92  |2022-02-19|
+---+----+----------+

相关问题