在pySpark中应用Window函数计算差异

nfs0ujit  于 2023-01-29  发布在  Spark
关注(0)|答案(2)|浏览(142)

我使用的是pySpark,并设置了包含两列的 Dataframe ,这两列表示每日资产价格,如下所示:

ind = sc.parallelize(range(1,5))
prices = sc.parallelize([33.3,31.1,51.2,21.3])
data = ind.zip(prices)
df = sqlCtx.createDataFrame(data,["day","price"])

我在应用df.show()时得到:

+---+-----+
|day|price|
+---+-----+
|  1| 33.3|
|  2| 31.1|
|  3| 51.2|
|  4| 21.3|
+---+-----+

这很好,我希望有另一列包含price列的每日收益,例如
(price(day2)-price(day1))/(price(day1))
经过大量的研究,我被告知通过应用pyspark.sql.window函数可以最有效地完成这一点,但我看不出如何实现。

olqngx59

olqngx591#

你可以使用lag函数调用previous day列,并添加额外的列,从这两个列中获取实际的每日回报,但是你可能需要告诉spark如何对数据进行分区和/或命令它进行延迟,如下所示:

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import lit

dfu = df.withColumn('user', lit('tmoore'))

df_lag = dfu.withColumn('prev_day_price',
                        func.lag(dfu['price'])
                                 .over(Window.partitionBy("user")))

result = df_lag.withColumn('daily_return', 
          (df_lag['price'] - df_lag['prev_day_price']) / df_lag['price'] )

>>> result.show()
+---+-----+-------+--------------+--------------------+
|day|price|   user|prev_day_price|        daily_return|
+---+-----+-------+--------------+--------------------+
|  1| 33.3| tmoore|          null|                null|
|  2| 31.1| tmoore|          33.3|-0.07073954983922816|
|  3| 51.2| tmoore|          31.1|         0.392578125|
|  4| 21.3| tmoore|          51.2|  -1.403755868544601|
+---+-----+-------+--------------+--------------------+

下面是对Window functions in Spark的详细介绍。

xytpbqjk

xytpbqjk2#

Lag函数可以帮助您解决您的用例。

from pyspark.sql.window import Window
import pyspark.sql.functions as func

### Defining the window 
Windowspec=Window.orderBy("day")

### Calculating lag of price at each day level
prev_day_price= df.withColumn('prev_day_price',
                        func.lag(dfu['price'])
                                .over(Windowspec))

### Calculating the average                                  
result = prev_day_price.withColumn('daily_return', 
          (prev_day_price['price'] - prev_day_price['prev_day_price']) / 
prev_day_price['price'] )

相关问题