PySpark中的pct_change函数

bsxbgnwa  于 2022-12-17  发布在  Spark
关注(0)|答案(1)|浏览(126)

我需要在一个新的pct_change列中分别为每个ID计算value列中值的百分比变化。示例df如下。
互联网上的一些消息来源说,在pyspark2.4+中有一个pct_change()函数,这将使这一点变得容易,但我在3.0.1上,我无法从pyspark.sql.functions导入它。

ID   value pct_change 
    1     1      nan
    1     2       1
    1     4       1
    2     1      nan
    2     1       0
    2    0.5    -0.5
    3     5      nan
    3     5       0
    3     7      0.4
vjhs03f7

vjhs03f71#

在pyspark中使用窗口功能
代码和逻辑如下

w =Window.partitionBy('ID').orderBy('index')#.rowsBetween(-1,0)

(df.withColumn('index', monotonically_increasing_id())#Create an index to OrderBy
 .withColumn('pct_change', (col('value')-lag('value').over(w))#Calculate change in consecutive rows
  /lag('value').over(w))#Find rate of change in consecutive row
 .drop('index')#Drop the ordering column
).show()

+---+-----+----------+
| ID|value|pct_change|
+---+-----+----------+
|  1|  1.0|      null|
|  1|  2.0|       1.0|
|  1|  4.0|       1.0|
|  2|  1.0|      null|
|  2|  1.0|       0.0|
|  2|  0.5|      -0.5|
|  3|  5.0|      null|
|  3|  5.0|       0.0|
|  3|  7.0|       0.4|
+---+-----+----------+

相关问题