我有下面的dataframe,我试图转换cumsum
列看起来像new_cumsum
列。id_B_entry_date
是date
,相应的id_B
被引入这个种群。
import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame(
[
(1001, 2001, "2023-04-01", "2023-04-01", False, 0, 0),
(1001, 2001, "2023-04-02", "2023-04-01", False, 0, 0),
(1001, 2001, "2023-04-03", "2023-04-01", False, 1, 1),
(1001, 2001, "2023-04-04", "2023-04-01", False, 1, 1),
(1001, 2002, "2023-04-05", "2023-04-05", True, 4, 3),
(1001, 2001, "2023-04-05", "2023-04-01", False, 4, 4),
(1001, 2001, "2023-04-06", "2023-04-01", False, 4, 4),
(1001, 2002, "2023-04-06", "2023-04-05", False, 4, 3),
(1001, 2001, "2023-04-07", "2023-04-01", False, 4, 4),
(1001, 2002, "2023-04-07", "2023-04-05", False, 4, 3),
(1001, 2001, "2023-04-08", "2023-04-01", False, 10, 10),
(1001, 2002, "2023-04-08", "2023-04-05", False, 10, 9),
(1001, 2003, "2023-04-09", "2023-04-09", True, 10, 0),
(1001, 2001, "2023-04-09", "2023-04-01", False, 10, 10),
(1001, 2002, "2023-04-09", "2023-04-05", False, 10, 9),
(1001, 2001, "2023-04-10", "2023-04-01", False, 12, 12),
(1001, 2002, "2023-04-10", "2023-04-05", False, 12, 11),
(1001, 2003, "2023-04-10", "2023-04-09", False, 12, 2),
(1001, 2001, "2023-04-11", "2023-04-01", False, 13, 13),
(1001, 2002, "2023-04-11", "2023-04-05", False, 13, 12),
(1001, 2003, "2023-04-11", "2023-04-09", False, 13, 3),
],
["id_A", "id_B", "date", "id_B_entry_date", "reset", "cumsum", "new_cumsum"],
)
df.show()
+----+----+----------+---------------+-----+------+----------+
|id_A|id_B| date|id_B_entry_date|reset|cumsum|new_cumsum|
+----+----+----------+---------------+-----+------+----------+
|1001|2001|2023-04-01| 2023-04-01|false| 0| 0|
|1001|2001|2023-04-02| 2023-04-01|false| 0| 0|
|1001|2001|2023-04-03| 2023-04-01|false| 1| 1|
|1001|2001|2023-04-04| 2023-04-01|false| 1| 1|
|1001|2002|2023-04-05| 2023-04-05| true| 4| 3|
|1001|2001|2023-04-05| 2023-04-01|false| 4| 4|
|1001|2001|2023-04-06| 2023-04-01|false| 4| 4|
|1001|2002|2023-04-06| 2023-04-05|false| 4| 3|
|1001|2001|2023-04-07| 2023-04-01|false| 4| 4|
|1001|2002|2023-04-07| 2023-04-05|false| 4| 3|
|1001|2001|2023-04-08| 2023-04-01|false| 10| 10|
|1001|2002|2023-04-08| 2023-04-05|false| 10| 9|
|1001|2003|2023-04-09| 2023-04-09| true| 10| 0|
|1001|2001|2023-04-09| 2023-04-01|false| 10| 10|
|1001|2002|2023-04-09| 2023-04-05|false| 10| 9|
|1001|2001|2023-04-10| 2023-04-01|false| 12| 12|
|1001|2002|2023-04-10| 2023-04-05|false| 12| 11|
|1001|2003|2023-04-10| 2023-04-09|false| 12| 2|
|1001|2001|2023-04-11| 2023-04-01|false| 13| 13|
|1001|2002|2023-04-11| 2023-04-05|false| 13| 12|
|1001|2003|2023-04-11| 2023-04-09|false| 13| 3|
+----+----+----------+---------------+-----+------+----------+
这里的逻辑是每当引入新的id_B
时重置cumsum
,但保留具有预先存在的id_Bs的行的cumsum
。
例如,在2023-04-05,id_B
2002被引入,所以我想在那个日期为它重置cumsum
。碰巧在那天cumsum
从1增加到4,所以new_cumsum
应该是3。但是,该日期的id_B
2001的cumsum
不应重置。
2023-04-09推出id_B
2003。由于该日期的cumsum
保持为10,因此重置应将相应的new_cumsum
计算为0。id_B
2001和2002在此日期不应重置。
我已经尝试了几种方法,以下是最接近我所需要的,但它只是不完全在那里。
w1 = Window.partitionBy("id_A").orderBy("date")
w2 = Window.partitionBy("id_A", "id_B_entry_date").orderBy("date")
w3 = Window.partitionBy("partition2", "id_A", "id_B_entry_date").orderBy("date")
df2 = (
df
.withColumn("diff", F.col("cumsum") - F.lag("cumsum", default=0).over(w2))
.withColumn("partition", F.when(F.col("reset"), 1).otherwise(0))
.withColumn("partition2", F.sum("partition").over(w1))
.withColumn("new_cumsum_attempt", F.sum(F.col("diff")).over(w3))
.drop("diff", "partition", "partition2")
)
df2.orderBy('date').show()
+----+----+----------+---------------+-----+------+----------+------------------+
|id_A|id_B| date|id_B_entry_date|reset|cumsum|new_cumsum|new_cumsum_attempt|
+----+----+----------+---------------+-----+------+----------+------------------+
|1001|2001|2023-04-01| 2023-04-01|false| 0| 0| 0|
|1001|2001|2023-04-02| 2023-04-01|false| 0| 0| 0|
|1001|2001|2023-04-03| 2023-04-01|false| 1| 1| 1|
|1001|2001|2023-04-04| 2023-04-01|false| 1| 1| 1|
|1001|2001|2023-04-05| 2023-04-01|false| 4| 4| 3|
|1001|2002|2023-04-05| 2023-04-05| true| 4| 3| 4|
|1001|2001|2023-04-06| 2023-04-01|false| 4| 4| 3|
|1001|2002|2023-04-06| 2023-04-05|false| 4| 3| 4|
|1001|2001|2023-04-07| 2023-04-01|false| 4| 4| 3|
|1001|2002|2023-04-07| 2023-04-05|false| 4| 3| 4|
|1001|2001|2023-04-08| 2023-04-01|false| 10| 10| 9|
|1001|2002|2023-04-08| 2023-04-05|false| 10| 9| 10|
|1001|2001|2023-04-09| 2023-04-01|false| 10| 10| 0|
|1001|2002|2023-04-09| 2023-04-05|false| 10| 9| 0|
|1001|2003|2023-04-09| 2023-04-09| true| 10| 0| 10|
|1001|2001|2023-04-10| 2023-04-01|false| 12| 12| 2|
|1001|2002|2023-04-10| 2023-04-05|false| 12| 11| 2|
|1001|2003|2023-04-10| 2023-04-09|false| 12| 2| 12|
|1001|2001|2023-04-11| 2023-04-01|false| 13| 13| 3|
|1001|2002|2023-04-11| 2023-04-05|false| 13| 12| 3|
|1001|2003|2023-04-11| 2023-04-09|false| 13| 3| 13|
+----+----+----------+---------------+-----+------+----------+------------------+
我已经问了一个similar question in the past,但这里需要注意的是,在某些情况下保持累积和,而不是总是重置。
提前感谢您的任何答复。
1条答案
按热度按时间kcugc4gi1#
我能够通过窗口函数的组合和创建一个
lag2
列来解决这个问题,以解决具有重复日期的行(因为cumsum
在任何特定日期总是相同的)。