Pyspark -选择性重置累积和列

oalqel3c  于 2023-05-06  发布在  Spark
关注(0)|答案(1)|浏览(116)

我有下面的dataframe,我试图转换cumsum列看起来像new_cumsum列。id_B_entry_datedate,相应的id_B被引入这个种群。

import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame(
    [
        (1001, 2001, "2023-04-01", "2023-04-01", False, 0, 0),
        (1001, 2001, "2023-04-02", "2023-04-01", False, 0, 0),
        (1001, 2001, "2023-04-03", "2023-04-01", False, 1, 1),
        (1001, 2001, "2023-04-04", "2023-04-01", False, 1, 1),
        (1001, 2002, "2023-04-05", "2023-04-05", True, 4, 3),
        (1001, 2001, "2023-04-05", "2023-04-01", False, 4, 4),
        (1001, 2001, "2023-04-06", "2023-04-01", False, 4, 4),
        (1001, 2002, "2023-04-06", "2023-04-05", False, 4, 3),
        (1001, 2001, "2023-04-07", "2023-04-01", False, 4, 4),
        (1001, 2002, "2023-04-07", "2023-04-05", False, 4, 3),
        (1001, 2001, "2023-04-08", "2023-04-01", False, 10, 10),
        (1001, 2002, "2023-04-08", "2023-04-05", False, 10, 9),
        (1001, 2003, "2023-04-09", "2023-04-09", True, 10, 0),
        (1001, 2001, "2023-04-09", "2023-04-01", False, 10, 10),
        (1001, 2002, "2023-04-09", "2023-04-05", False, 10, 9),
        (1001, 2001, "2023-04-10", "2023-04-01", False, 12, 12),
        (1001, 2002, "2023-04-10", "2023-04-05", False, 12, 11), 
        (1001, 2003, "2023-04-10", "2023-04-09", False, 12, 2),
        (1001, 2001, "2023-04-11", "2023-04-01", False, 13, 13),
        (1001, 2002, "2023-04-11", "2023-04-05", False, 13, 12),
        (1001, 2003, "2023-04-11", "2023-04-09", False, 13, 3),
    ],
    ["id_A", "id_B", "date", "id_B_entry_date", "reset", "cumsum", "new_cumsum"],
)
df.show()
+----+----+----------+---------------+-----+------+----------+
|id_A|id_B|      date|id_B_entry_date|reset|cumsum|new_cumsum|
+----+----+----------+---------------+-----+------+----------+
|1001|2001|2023-04-01|     2023-04-01|false|     0|         0|
|1001|2001|2023-04-02|     2023-04-01|false|     0|         0|
|1001|2001|2023-04-03|     2023-04-01|false|     1|         1|
|1001|2001|2023-04-04|     2023-04-01|false|     1|         1|
|1001|2002|2023-04-05|     2023-04-05| true|     4|         3|
|1001|2001|2023-04-05|     2023-04-01|false|     4|         4|
|1001|2001|2023-04-06|     2023-04-01|false|     4|         4|
|1001|2002|2023-04-06|     2023-04-05|false|     4|         3|
|1001|2001|2023-04-07|     2023-04-01|false|     4|         4|
|1001|2002|2023-04-07|     2023-04-05|false|     4|         3|
|1001|2001|2023-04-08|     2023-04-01|false|    10|        10|
|1001|2002|2023-04-08|     2023-04-05|false|    10|         9|
|1001|2003|2023-04-09|     2023-04-09| true|    10|         0|
|1001|2001|2023-04-09|     2023-04-01|false|    10|        10|
|1001|2002|2023-04-09|     2023-04-05|false|    10|         9|
|1001|2001|2023-04-10|     2023-04-01|false|    12|        12|
|1001|2002|2023-04-10|     2023-04-05|false|    12|        11|
|1001|2003|2023-04-10|     2023-04-09|false|    12|         2|
|1001|2001|2023-04-11|     2023-04-01|false|    13|        13|
|1001|2002|2023-04-11|     2023-04-05|false|    13|        12|
|1001|2003|2023-04-11|     2023-04-09|false|    13|         3|
+----+----+----------+---------------+-----+------+----------+

这里的逻辑是每当引入新的id_B时重置cumsum,但保留具有预先存在的id_Bs的行的cumsum
例如,在2023-04-05,id_B 2002被引入,所以我想在那个日期为它重置cumsum。碰巧在那天cumsum从1增加到4,所以new_cumsum应该是3。但是,该日期的id_B 2001的cumsum不应重置。
2023-04-09推出id_B 2003。由于该日期的cumsum保持为10,因此重置应将相应的new_cumsum计算为0。id_B 2001和2002在此日期不应重置。
我已经尝试了几种方法,以下是最接近我所需要的,但它只是不完全在那里。

w1 = Window.partitionBy("id_A").orderBy("date")
w2 = Window.partitionBy("id_A", "id_B_entry_date").orderBy("date")
w3 = Window.partitionBy("partition2", "id_A", "id_B_entry_date").orderBy("date")

df2 = (
    df
    .withColumn("diff", F.col("cumsum") - F.lag("cumsum", default=0).over(w2))
    .withColumn("partition", F.when(F.col("reset"), 1).otherwise(0))
    .withColumn("partition2", F.sum("partition").over(w1))
    .withColumn("new_cumsum_attempt", F.sum(F.col("diff")).over(w3))
    .drop("diff", "partition", "partition2")
)

df2.orderBy('date').show()
+----+----+----------+---------------+-----+------+----------+------------------+
|id_A|id_B|      date|id_B_entry_date|reset|cumsum|new_cumsum|new_cumsum_attempt|
+----+----+----------+---------------+-----+------+----------+------------------+
|1001|2001|2023-04-01|     2023-04-01|false|     0|         0|                 0|
|1001|2001|2023-04-02|     2023-04-01|false|     0|         0|                 0|
|1001|2001|2023-04-03|     2023-04-01|false|     1|         1|                 1|
|1001|2001|2023-04-04|     2023-04-01|false|     1|         1|                 1|
|1001|2001|2023-04-05|     2023-04-01|false|     4|         4|                 3|
|1001|2002|2023-04-05|     2023-04-05| true|     4|         3|                 4|
|1001|2001|2023-04-06|     2023-04-01|false|     4|         4|                 3|
|1001|2002|2023-04-06|     2023-04-05|false|     4|         3|                 4|
|1001|2001|2023-04-07|     2023-04-01|false|     4|         4|                 3|
|1001|2002|2023-04-07|     2023-04-05|false|     4|         3|                 4|
|1001|2001|2023-04-08|     2023-04-01|false|    10|        10|                 9|
|1001|2002|2023-04-08|     2023-04-05|false|    10|         9|                10|
|1001|2001|2023-04-09|     2023-04-01|false|    10|        10|                 0|
|1001|2002|2023-04-09|     2023-04-05|false|    10|         9|                 0|
|1001|2003|2023-04-09|     2023-04-09| true|    10|         0|                10|
|1001|2001|2023-04-10|     2023-04-01|false|    12|        12|                 2|
|1001|2002|2023-04-10|     2023-04-05|false|    12|        11|                 2|
|1001|2003|2023-04-10|     2023-04-09|false|    12|         2|                12|
|1001|2001|2023-04-11|     2023-04-01|false|    13|        13|                 3|
|1001|2002|2023-04-11|     2023-04-05|false|    13|        12|                 3|
|1001|2003|2023-04-11|     2023-04-09|false|    13|         3|                13|
+----+----+----------+---------------+-----+------+----------+------------------+

我已经问了一个similar question in the past,但这里需要注意的是,在某些情况下保持累积和,而不是总是重置。
提前感谢您的任何答复。

kcugc4gi

kcugc4gi1#

我能够通过窗口函数的组合和创建一个lag2列来解决这个问题,以解决具有重复日期的行(因为cumsum在任何特定日期总是相同的)。

w0 = Window.partitionBy("id_A", "id_B").orderBy("date", "id_B_entry_date")
w1 = Window.partitionBy("id_A").orderBy("date")
w2 = Window.partitionBy("id_A").orderBy("id_B_entry_date")
w3 = Window.partitionBy("partition", "id_A", "id_B").orderBy("date")

df2 = (
    df
    .withColumn("lag", F.lag("cumsum", default=0).over(w0))
    .withColumn(
        "lag2", 
        F.when(F.col("lag")==0, F.max("lag", default=0).over(w1))
        .otherwise(F.col("lag"))
    )
    .withColumn("partition", F.rank().over(w2))
    .withColumn("diff", F.col("cumsum") - F.col("lag2"))
    .withColumn("new_cumsum_attempt", F.sum(F.col("diff")).over(w3))
    .withColumn("check", F.col("new_cumsum")==F.col("new_cumsum_attempt"))
)

df2.orderBy('date').show(21)
+----+----+----------+---------------+-----+------+----------+---+----+---------+----+------------------+-----+
|id_A|id_B|      date|id_B_entry_date|reset|cumsum|new_cumsum|lag|lag2|partition|diff|new_cumsum_attempt|check|
+----+----+----------+---------------+-----+------+----------+---+----+---------+----+------------------+-----+
|1001|2001|2023-04-01|     2023-04-01|false|     0|         0|  0|   0|        1|   0|                 0| true|
|1001|2001|2023-04-02|     2023-04-01|false|     0|         0|  0|   0|        1|   0|                 0| true|
|1001|2001|2023-04-03|     2023-04-01|false|     1|         1|  0|   0|        1|   1|                 1| true|
|1001|2001|2023-04-04|     2023-04-01|false|     1|         1|  1|   1|        1|   0|                 1| true|
|1001|2001|2023-04-05|     2023-04-01|false|     4|         4|  1|   1|        1|   3|                 4| true|
|1001|2002|2023-04-05|     2023-04-05| true|     4|         3|  0|   1|       12|   3|                 3| true|
|1001|2001|2023-04-06|     2023-04-01|false|     4|         4|  4|   4|        1|   0|                 4| true|
|1001|2002|2023-04-06|     2023-04-05|false|     4|         3|  4|   4|       12|   0|                 3| true|
|1001|2001|2023-04-07|     2023-04-01|false|     4|         4|  4|   4|        1|   0|                 4| true|
|1001|2002|2023-04-07|     2023-04-05|false|     4|         3|  4|   4|       12|   0|                 3| true|
|1001|2001|2023-04-08|     2023-04-01|false|    10|        10|  4|   4|        1|   6|                10| true|
|1001|2002|2023-04-08|     2023-04-05|false|    10|         9|  4|   4|       12|   6|                 9| true|
|1001|2001|2023-04-09|     2023-04-01|false|    10|        10| 10|  10|        1|   0|                10| true|
|1001|2002|2023-04-09|     2023-04-05|false|    10|         9| 10|  10|       12|   0|                 9| true|
|1001|2003|2023-04-09|     2023-04-09| true|    10|         0|  0|  10|       19|   0|                 0| true|
|1001|2001|2023-04-10|     2023-04-01|false|    12|        12| 10|  10|        1|   2|                12| true|
|1001|2002|2023-04-10|     2023-04-05|false|    12|        11| 10|  10|       12|   2|                11| true|
|1001|2003|2023-04-10|     2023-04-09|false|    12|         2| 10|  10|       19|   2|                 2| true|
|1001|2001|2023-04-11|     2023-04-01|false|    13|        13| 12|  12|        1|   1|                13| true|
|1001|2002|2023-04-11|     2023-04-05|false|    13|        12| 12|  12|       12|   1|                12| true|
|1001|2003|2023-04-11|     2023-04-09|false|    13|         3| 12|  12|       19|   1|                 3| true|
+----+----+----------+---------------+-----+------+----------+---+----+---------+----+------------------+-----+

相关问题