计算季度增长

n9vozmp4  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(327)

我有一些日常数据 df ,可以追溯到2020年1月1日。它看起来与下面的相似,但是有很多 id1 每天都有。

| yyyy_mm_dd | id1 | id2  | cost  |
|------------|-----|------|-------|
| 2020-01-01 | 23  | 7253 | 5003  |
| 2020-01-01 | 23  | 7743 | 30340 |
| 2020-01-02 | 23  | 7253 | 450   |
| 2020-01-02 | 23  | 7743 | 4500  |
| ...        | ... | ...  | ...   |
| 2021-01-01 | 23  | 7253 | 5675  |
| 2021-01-01 | 23  | 134  | 1030  |
| 2021-01-01 | 23  | 3445 | 564   |
| 2021-01-01 | 23  | 4534 | 345   |
| ...        | ... | ...  | ...   |

我想计算(1)按季度和季度分组的总成本 id1 (二)同比增长%。
我对总成本进行了分组和计算,如下所示:

grouped_quarterly = (
    df
    .withColumn('year_quarter', (F.year(sf.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd'))
    .groupby('id1', 'year_quarter')
    .agg(
        F.sum('cost').alias('cost')
    )
)

但我不确定如何与去年相比实现增长。基于上述示例的预期输出:

| year_quarter | id1 | cost | cost_growth |
|--------------|-----|------|-------------|
| 202101       | 23  | 7614 | -81         |

这也会是很好的设置 cost_growth 如果 id1 在上一季度没有行。
编辑:下面是一个尝试进行比较,但我得到一个错误,没有属性 prev_value :

grouped_quarterly = (
    df
    .withColumn('year_quarter', (F.year(sf.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd'))
    .groupby('id1', 'year_quarter')
    .agg(
        F.sum('cost').alias('cost')
    )
)

w = Window.partitionBy('id1').orderBy('year_quarter')
growth = (
    grouped_quarterly
    .withColumn('prev_value', sf.lag(grouped_quarterly.cost).over(w))
    .withColumn('diff', sf.when(sf.isnull(grouped_quarterly.cost - grouped_quarterly.prev_value), 0).otherwise(grouped_quarterly.cost - grouped_quarterly.cost))
)

编辑#2:无论年份如何,窗口函数似乎都采用上一季度。这意味着我的 prev_value 列为上一季度,而不是上一年的同一季度:

grouped_quarterly.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)

| id1 | year_quarter | cost |
|-----|--------------|------|
| 222 | 202001       | 73   |
| 222 | 202002       | 246  |
| 222 | 202003       | 525  |
| 222 | 202004       | -27  |
| 222 | 202101       | 380  |

w = Window.partitionBy('id1').orderBy('year_quarter')
growth = (
    grouped_quarterly
    .withColumn('prev_value', sf.lag(sf.col('cost')).over(w))
    .withColumn('diff', sf.when(sf.isnull(sf.col('cost') - sf.col('prev_value')), 0).otherwise(sf.col('cost') - sf.col('prev_value')))
)

growth.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)

| id1 | year_quarter | cost | prev_value | diff |
|-----|--------------|------|------------|------|
| 222 | 202001       | 73   | null       | 0    |
| 222 | 202002       | 246  | 73         | 173  |
| 222 | 202003       | 525  | 246        | 279  |
| 222 | 202004       | -27  | 525        | -522 |
| 222 | 202101       | 380  | -27        | 407  |

编辑#3:在分区中使用四分之一将导致空值 prev_value 对于所有行:

grouped_quarterly.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)

| id1 | year_quarter | cost |
|-----|--------------|------|
| 222 | 202001       | 73   |
| 222 | 202002       | 246  |
| 222 | 202003       | 525  |
| 222 | 202004       | -27  |
| 222 | 202101       | 380  |

w = Window.partitionBy(sf.col('id1'), sf.expr('substring(string(year_quarter), 2)')).orderBy('year_quarter')
growth = (
    grouped_quarterly
    .withColumn('prev_value', sf.lag(sf.col('cost')).over(w))
    .withColumn('diff', sf.when(sf.isnull(sf.col('cost') - sf.col('prev_value')), 0).otherwise(sf.col('cost') - sf.col('prev_value')))
)

growth.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)

| id1 | year_quarter | cost | prev_value | diff  |
|-----|--------------|------|------------|-------|
| 222 | 202001       | 73   | null       | 0     |
| 222 | 202002       | 246  | null       | 0     |
| 222 | 202003       | 525  | null       | 0     |
| 222 | 202004       | -27  | null       | 0     |
| 222 | 202101       | 380  | null       | 0     |
xoshrz7s

xoshrz7s1#

也可以尝试在分区中使用四分之一,这样 lag 将为您提供去年同一季度的价值:

w = Window.partitionBy(sf.col('id1'), sf.expr('substring(string(year_quarter), -2)')).orderBy('year_quarter')

相关问题