我有一些日常数据 df
,可以追溯到2020年1月1日。它看起来与下面的相似,但是有很多 id1
每天都有。
| yyyy_mm_dd | id1 | id2 | cost |
|------------|-----|------|-------|
| 2020-01-01 | 23 | 7253 | 5003 |
| 2020-01-01 | 23 | 7743 | 30340 |
| 2020-01-02 | 23 | 7253 | 450 |
| 2020-01-02 | 23 | 7743 | 4500 |
| ... | ... | ... | ... |
| 2021-01-01 | 23 | 7253 | 5675 |
| 2021-01-01 | 23 | 134 | 1030 |
| 2021-01-01 | 23 | 3445 | 564 |
| 2021-01-01 | 23 | 4534 | 345 |
| ... | ... | ... | ... |
我想计算(1)按季度和季度分组的总成本 id1
(二)同比增长%。
我对总成本进行了分组和计算,如下所示:
grouped_quarterly = (
df
.withColumn('year_quarter', (F.year(sf.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd'))
.groupby('id1', 'year_quarter')
.agg(
F.sum('cost').alias('cost')
)
)
但我不确定如何与去年相比实现增长。基于上述示例的预期输出:
| year_quarter | id1 | cost | cost_growth |
|--------------|-----|------|-------------|
| 202101 | 23 | 7614 | -81 |
这也会是很好的设置 cost_growth
如果 id1
在上一季度没有行。
编辑:下面是一个尝试进行比较,但我得到一个错误,没有属性 prev_value
:
grouped_quarterly = (
df
.withColumn('year_quarter', (F.year(sf.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd'))
.groupby('id1', 'year_quarter')
.agg(
F.sum('cost').alias('cost')
)
)
w = Window.partitionBy('id1').orderBy('year_quarter')
growth = (
grouped_quarterly
.withColumn('prev_value', sf.lag(grouped_quarterly.cost).over(w))
.withColumn('diff', sf.when(sf.isnull(grouped_quarterly.cost - grouped_quarterly.prev_value), 0).otherwise(grouped_quarterly.cost - grouped_quarterly.cost))
)
编辑#2:无论年份如何,窗口函数似乎都采用上一季度。这意味着我的 prev_value
列为上一季度,而不是上一年的同一季度:
grouped_quarterly.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)
| id1 | year_quarter | cost |
|-----|--------------|------|
| 222 | 202001 | 73 |
| 222 | 202002 | 246 |
| 222 | 202003 | 525 |
| 222 | 202004 | -27 |
| 222 | 202101 | 380 |
w = Window.partitionBy('id1').orderBy('year_quarter')
growth = (
grouped_quarterly
.withColumn('prev_value', sf.lag(sf.col('cost')).over(w))
.withColumn('diff', sf.when(sf.isnull(sf.col('cost') - sf.col('prev_value')), 0).otherwise(sf.col('cost') - sf.col('prev_value')))
)
growth.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)
| id1 | year_quarter | cost | prev_value | diff |
|-----|--------------|------|------------|------|
| 222 | 202001 | 73 | null | 0 |
| 222 | 202002 | 246 | 73 | 173 |
| 222 | 202003 | 525 | 246 | 279 |
| 222 | 202004 | -27 | 525 | -522 |
| 222 | 202101 | 380 | -27 | 407 |
编辑#3:在分区中使用四分之一将导致空值 prev_value
对于所有行:
grouped_quarterly.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)
| id1 | year_quarter | cost |
|-----|--------------|------|
| 222 | 202001 | 73 |
| 222 | 202002 | 246 |
| 222 | 202003 | 525 |
| 222 | 202004 | -27 |
| 222 | 202101 | 380 |
w = Window.partitionBy(sf.col('id1'), sf.expr('substring(string(year_quarter), 2)')).orderBy('year_quarter')
growth = (
grouped_quarterly
.withColumn('prev_value', sf.lag(sf.col('cost')).over(w))
.withColumn('diff', sf.when(sf.isnull(sf.col('cost') - sf.col('prev_value')), 0).otherwise(sf.col('cost') - sf.col('prev_value')))
)
growth.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)
| id1 | year_quarter | cost | prev_value | diff |
|-----|--------------|------|------------|-------|
| 222 | 202001 | 73 | null | 0 |
| 222 | 202002 | 246 | null | 0 |
| 222 | 202003 | 525 | null | 0 |
| 222 | 202004 | -27 | null | 0 |
| 222 | 202101 | 380 | null | 0 |
1条答案
按热度按时间xoshrz7s1#
也可以尝试在分区中使用四分之一,这样
lag
将为您提供去年同一季度的价值: