pyspark 对上一行中的两个值求和

mklgxw1f  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(153)

我的 Dataframe 中有两列:“Cust_ID”和“Fill_days”。我需要为“Adjusted Days”添加一个列,如下所示:

其思想是:对于“Cust_ID”的每一个第一行,“Adjusted Days”值都应该为0。对于以下行,
如果上一行的“Fill_days”+“Adjusted Days”〈0,则为0,否则为上一行的“Fill_days”+“Adjusted Days”。如果需要在Excel中执行此操作,则使用以下公式:

C2=0
C3=IF(B2+C2<0,0,B2+C2)
C4=IF(B3+C3<0,0,B3+C3)

我可以用Pandas写代码,但是代码很慢。
在Spark里怎么做?我用的是Spark 3.2.1。

g2ieeal7

g2ieeal71#

首先,你需要一个用于排序的列。Spark不知道顺序。所有的行都可以在任何位置,除非你有一个可以告诉确切顺序的列。我已经添加了“顺序”列。
其次,引用同一列本身是不可能的。您需要一个解决方案。下面的代码将把每个“Cust_ID”的日期收集到一个列表中,并只在该列表中执行操作。完成后,使用inline展开结果。
输入:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, 1, 5),
     (1, 2, 2),
     (1, 3, 1),
     (1, 4, -9),
     (1, 5, -2),
     (1, 6, 9),
     (5, 1, -2),
     (5, 2, 1),
     (5, 3, -1)],
    ['Cust_ID', 'order', 'Fill_days'])

脚本:

df = df.groupBy('Cust_ID').agg(
    F.aggregate(
        F.array_sort(F.collect_list(F.struct('order', 'Fill_days'))),
        F.expr("array(struct(bigint(null) order, 0L Fill_days, 0L Adjusted_Days))"),
        lambda acc, x: F.array_union(
            acc,
            F.array(x.withField(
                'Adjusted_Days',
                F.greatest(F.lit(0), F.element_at(acc, -1)['Fill_days'] + F.element_at(acc, -1)['Adjusted_Days'])
            ))
        )
    ).alias('a')
)
df = df.selectExpr("Cust_ID", "inline(slice(a, 2, size(a)))")

df.show()

# +-------+-----+---------+-------------+

# |Cust_ID|order|Fill_days|Adjusted_Days|

# +-------+-----+---------+-------------+

# |1      |1    |5        |0            |

# |1      |2    |2        |5            |

# |1      |3    |1        |7            |

# |1      |4    |-9       |8            |

# |1      |5    |-2       |0            |

# |1      |6    |9        |0            |

# |5      |1    |-2       |0            |

# |5      |2    |1        |0            |

# |5      |3    |-1       |1            |

# +-------+-----+---------+-------------+

为了便于理解,请分析一下这个答案,因为第二次解释这个答案并不容易。

相关问题