python spark使用dataframe按组累积和

nr9pn0ug  于 2021-07-14  发布在  Spark
关注(0)|答案(2)|浏览(511)

如何使用
DataFrame abstraction ; 而且在 PySpark ?
数据集示例如下:

  1. df = sqlContext.createDataFrame( [(1,2,"a"),(3,2,"a"),(1,3,"b"),(2,2,"a"),(2,3,"b")],
  2. ["time", "value", "class"] )
  3. +----+-----+-----+
  4. |time|value|class|
  5. +----+-----+-----+
  6. | 1| 2| a|
  7. | 3| 2| a|
  8. | 1| 3| b|
  9. | 2| 2| a|
  10. | 2| 3| b|
  11. +----+-----+-----+

我想添加一个 value 对于每个 class 分组(有序) time 变量。

eni9jsuy

eni9jsuy1#

这可以使用窗口函数和窗口范围内的window.unboundpreceding值的组合来完成,如下所示:

  1. from pyspark.sql import Window
  2. from pyspark.sql import functions as F
  3. windowval = (Window.partitionBy('class').orderBy('time')
  4. .rangeBetween(Window.unboundedPreceding, 0))
  5. df_w_cumsum = df.withColumn('cum_sum', F.sum('value').over(windowval))
  6. df_w_cumsum.show()
  1. +----+-----+-----+-------+
  2. |time|value|class|cum_sum|
  3. +----+-----+-----+-------+
  4. | 1| 3| b| 3|
  5. | 2| 3| b| 6|
  6. | 1| 2| a| 2|
  7. | 2| 2| a| 4|
  8. | 3| 2| a| 6|
  9. +----+-----+-----+-------+
展开查看全部
myss37ts

myss37ts2#

我试过这种方法,它对我有效。

  1. from pyspark.sql import Window
  2. from pyspark.sql import functions as f
  3. import sys
  4. cum_sum = DF.withColumn('cumsum', f.sum('value').over(Window.partitionBy('class').orderBy('time').rowsBetween(-sys.maxsize, 0)))
  5. cum_sum.show()

相关问题