在一个日期范围内的PySpark Sum计算

zi8p0yeb  于 2023-06-28  发布在  Spark
关注(0)|答案(1)|浏览(124)

我有一个像下面这样的pyspark.DataFrame,列为idyearmoney。为了简单起见,我只取了一个id,但可能有多个。

id    year    money

1      2019    10

1      2018    15

1      2013    13

1      2009    10

1      2015    10

1      2014    11

在每个id和期间的结果DataFrame中,我想要过去连续3年的金额总和,不包括记录年份。
例如,对于2019年,我只想取2018年,2017年和2016年的money之和。因为我们只有2018年,所以总数是15。
另一个例子是2015年,我想取2014年、2013年和2012年的money之和。因为只有前2个,所以它的总和是24。
生成的DataFrame如下所示。

id     year    sum_money     

1      2019      15   

1      2018      10

1      2015      24

1      2014      13

1      2013      0

1      2009      0

如何才能达到预期的效果。lag函数是否提供任何这样的功能来只查找我想要的那些年份,或者是否有任何其他方法。

我的方法

我的方法是把历年的累积总和,按年递减排序。然后,对于每个id和年份,找到刚好小于预期窗口的最大年份。
比如说2019年和window = 3,开始的年份是2016年。因此,数据集中的最小年份,即2015年,是我们必须采取的。对应于2015年,填写该年的cum_sum。
然后在最终结果列中取两个累计和的差值与当年和的值。因此,2019年将是69 - 44 - 10 = 15。其他记录(idyear)也是如此。最终数据如下所示。

id    year    money   cum_sum     min_year    res_sum    diff
1      2019    10         69        2015        44        15
1      2018    15         59        2014        34        10
1      2015    10         44        2009        10        24
1      2014    11         34        2009        10        13
1      2013    13         23        2009        10        0
1      2009    10         10        0            0        0

我正在想一个更简单的办法。

yc0p9oo0

yc0p9oo01#

pyspark中,我们可以使用rangeBetween,正如@samkart所指出的:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
from pyspark.sql.window import Window

data = [
    {'id': 1, 'year': 2019, 'money': 10},
    {'id': 1, 'year': 2018, 'money': 15},
    {'id': 1, 'year': 2013, 'money': 13},
    {'id': 1, 'year': 2009, 'money': 10},
    {'id': 1, 'year': 2015, 'money': 10},
    {'id': 1, 'year': 2014, 'money': 11}
]

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(data)

# Calculate the sum over the preceding 3 years
window = Window.partitionBy("id").orderBy("year").rangeBetween(-3, -1)
df = df.withColumn("previous_3_years_sum", sum("money").over(window))
df = df.fillna(0, subset=["previous_3_years_sum"])
df.show()

输出:

+---+-----+----+--------------------+
| id|money|year|previous_3_years_sum|
+---+-----+----+--------------------+
|  1|   10|2009|                   0|
|  1|   13|2013|                   0|
|  1|   11|2014|                  13|
|  1|   10|2015|                  24|
|  1|   15|2018|                  10|
|  1|   10|2019|                  15|
+---+-----+----+--------------------+

这个解决方案也让我觉得更加优雅。窗口规格非常灵活和强大。我们不需要创建假条目,也不需要转移等。最初我提供了一个使用pandas的解决方案:

import pandas as pd

data = {
    'id': [1, 1, 1, 1, 1, 1],
    'year': [2019, 2018, 2013, 2009, 2015, 2014],
    'money': [10, 15, 13, 10, 10, 11]
}

df = pd.DataFrame(data)

#Fill in the missing years with 0s
df.set_index('year', inplace=True)
min_year = df.index.min()
max_year = df.index.max()
all_years = range(min_year, max_year + 1)
df = df.reindex(all_years).fillna(0)
df = df.reset_index().rename(columns={'index': 'year'})

#Sort by years, then cumsum shift by 3, diff and shift again to align the df
df=df.sort_values('year')
df['cum']=df.money.cumsum()
df['previous_3_years_sum']=(df.cum -df.cum.shift(3)).shift(1).fillna(0)
df=df.query('money>0')[['id','year','previous_3_years_sum']] #Filter for artifically inserted entries again
df

输出:

id  year    previous_3_years_sum
0   1.0 2009    0.0
4   1.0 2013    0.0
5   1.0 2014    13.0
6   1.0 2015    24.0
9   1.0 2018    10.0
10  1.0 2019    15.0

编辑:
我认为真实的的问题是如何将其用于具体的季度报告。我用Map解决了这个问题。我们将年份列乘以4,以腾出4个季度的空间:2000被Map到8000,并且8000现在表示Q1,8001表示Q2等。然后我们可以用rangeBetween表示12个四分之一。

from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import regexp_replace

# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Create the data
data = [
    (1, 10, "2009-Q1"),
    (1, 13, "2013-Q1"),
    (1, 11, "2014-Q1"),
    (1, 10, "2015-Q1"),
    (1, 15, "2018-Q1"),
    (1, 10, "2019-Q1")
]

# Create the DataFrame
df = spark.createDataFrame(data, ["id", "money", "quarter"])

# Split the quarter column into year and quarter columns
df = df.withColumn("year", split(df["quarter"], "-").getItem(0))
df = df.withColumn("quarter", split(df["quarter"], "-").getItem(1))
df = df.withColumn("quarter", regexp_replace(df["quarter"], "Q", "").cast("int"))
# Add a new column for the calculated value
df = df.withColumn("new_column", (df["year"] * 4) + (df["quarter"] - 1).cast("int"))
window_spec = Window.partitionBy("id").orderBy("new_column").rangeBetween(-12, -1)
df = df.withColumn("previous_3_years_sum", sum("money").over(window_spec))

# Fill null values with 0
df = df.fillna(0, subset=["previous_3_years_sum"])
df.show()

输出:

+---+-----+-------+----+----------+--------------------+
| id|money|quarter|year|new_column|previous_3_years_sum|
+---+-----+-------+----+----------+--------------------+
|  1|   10|      1|2009|    8036.0|                   0|
|  1|   13|      1|2013|    8052.0|                   0|
|  1|   11|      1|2014|    8056.0|                  13|
|  1|   10|      1|2015|    8060.0|                  24|
|  1|   15|      1|2018|    8072.0|                  10|
|  1|   10|      1|2019|    8076.0|                  15|
+---+-----+-------+----+----------+--------------------+

相关问题