使用DataFrame的Python Spark按组累计和

jpfvwuh4  于 2022-11-16  发布在  Apache
关注(0)|答案(4)|浏览(175)

如何计算每组的累计和,具体使用DataFrameabstraction;在PySpark中呢?
下面是一个示例数据集:

df = sqlContext.createDataFrame( [(1,2,"a"),(3,2,"a"),(1,3,"b"),(2,2,"a"),(2,3,"b")], 
                                 ["time", "value", "class"] )

+----+-----+-----+
|time|value|class|
+----+-----+-----+
|   1|    2|    a|
|   3|    2|    a|
|   1|    3|    b|
|   2|    2|    a|
|   2|    3|    b|
+----+-----+-----+

我想为(有序)time变量上的每个class分组添加一个value的累积和列。

wpx232ag

wpx232ag1#

这可以使用窗口函数和窗口范围中的Window.unboundedPreceding值的组合来完成,如下所示:
第一个

kadbb459

kadbb4592#

更新之前的答案。正确和精确的方法是:

from pyspark.sql import Window
from pyspark.sql import functions as F

windowval = (Window.partitionBy('class').orderBy('time')
             .rowsBetween(Window.unboundedPreceding, 0))
df_w_cumsum = df.withColumn('cum_sum', F.sum('value').over(windowval))
df_w_cumsum.show()
ruarlubt

ruarlubt3#

我试过这种方法,它对我很有效。

from pyspark.sql import Window

from pyspark.sql import functions as f

import sys

cum_sum = DF.withColumn('cumsum', f.sum('value').over(Window.partitionBy('class').orderBy('time').rowsBetween(-sys.maxsize, 0)))
cum_sum.show()
gab6jxml

gab6jxml4#

我在此链接中创建此函数以供用途:kolang/列函数/累计和

def cumulative_sum(col: Union[Column, str],
                   on_col: Union[Column, str],
                   ascending: bool = True,
                   partition_by: Union[Column, str, List[Union[Column, str]]] = None) -> Column:
    on_col = on_col if ascending else F.desc(on_col)
    if partition_by is None:
        w = Window.orderBy(on_col).rangeBetween(Window.unboundedPreceding, 0)
    else:
        w = Window.partitionBy(partition_by).orderBy(on_col).rangeBetween(Window.unboundedPreceding, 0)
    return F.sum(col).over(w)

相关问题