Python3.x—季度至今的增长

uubf1zoe  于 2021-07-14  发布在  Spark
关注(0)|答案(2)|浏览(369)

这次悬赏已经结束了。回答此问题可获得+50声望奖励。赏金宽限期9小时后结束。stackq想要奖励现有答案**。

我有一些日常数据 df ,可以追溯到2020年1月1日。它看起来与下面的相似,但是有很多 id1 每天都有。

| yyyy_mm_dd | id1 | id2  | cost  |
|------------|-----|------|-------|
| 2020-01-01 | 23  | 7253 | 5003  |
| 2020-01-01 | 23  | 7743 | 30340 |
| 2020-01-02 | 23  | 7253 | 450   |
| 2020-01-02 | 23  | 7743 | 4500  |
| ...        | ... | ...  | ...   |
| 2021-01-01 | 23  | 7253 | 5675  |
| 2021-01-01 | 23  | 134  | 1030  |
| 2021-01-01 | 23  | 3445 | 564   |
| 2021-01-01 | 23  | 4534 | 345   |
| ...        | ... | ...  | ...   |

我对总成本进行了分组和计算,如下所示:

grouped_quarterly = (
    df
    .withColumn('year_quarter', (F.year(F.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd'))
    .groupby('id1', 'year_quarter')
    .agg(
        F.sum('cost').alias('cost')
    )
)

然后,我能够成功地进行季度比较,如下所示:

w = Window.partitionBy(F.col('id1'), F.expr('substring(string(year_quarter), -2)')).orderBy('year_quarter')
growth = (
    grouped_quarterly
    .withColumn('prev_value', F.lag(F.col('cost')).over(w))
    .withColumn('diff', F.when(F.isnull(F.col('cost') - F.col('prev_value')), 0).otherwise(F.col('cost') - F.col('prev_value')))
).where(F.col('year_quarter') >= 202101)

我想修改这是一个季度至今,而不是一个季度比一个季度。例如,上面将比较2020年4月1日至2020年6月30日与2020年4月1日至2021年4月15日(或df中的任何最长日期)。
相反,我更愿意将2020年4月1日-2020年4月15日与2021年4月1日-2021年4月15日进行比较。
是否有可能确保在年度/季度内只比较相同的期间?
编辑:添加示例输出:

grouped_quarterly.where(F.col('id1') == 222).sort('year_quarter').show(10,False)

| id1 | year_quarter | cost  |
|-----|--------------|-------|
| 222 | 202001       | 49428 |
| 222 | 202002       | 43292 |
| 222 | 202003       | 73928 |
| 222 | 202004       | 12028 |
| 222 | 202101       | 19382 |
| 222 | 202102       | 4282  |

growth.where(F.col('id1') == 222).sort('year_quarter').show(10,False)

| id1 | year_quarter | cost  | prev_value | diff   | growth |
|-----|--------------|-------|------------|--------|--------|
| 222 | 202101       | 52494 | 49428      | 3066   | 6.20   |
| 222 | 202102       | 4282  | 43292      | -39010 | -90.10 |

从窗口进行的增长计算是正确的。但是,由于202102正在进行中,因此将其与完整的202002进行比较。202101年的比较非常有效,因为这两个季度都已完成。
对于不完整的季度,是否有办法确保窗口功能只比较上一年/季度内的同期?我希望样本数据能让我的问题更清楚一点

d5vmydt9

d5vmydt91#

如果你想和上一个季度进行比较,但是这个季度还没有完成,那么就做agg by dayofmonth(col("input")).alias("dayofmonth") 如果比较的本季度等于本年度的本月,则可以 .agg(when(col("date_column") condition exp)) 这里有更多的见解

gojuced7

gojuced72#

我们的想法是将任务分为两部分:
计算整个季度的增长。这个逻辑完全从问题中接管,然后
计算当前运行季度的增长。
首先生成2019q2、2020q2和2021q2的一些附加测试数据:

data = [('2019-04-01', 23, 1), ('2019-04-01', 23, 2), ('2019-04-02', 23, 3), ('2019-04-15', 23, 4),
        ('2019-04-16', 23, 5), ('2019-04-17', 23, 6), ('2019-05-01', 23, 7), ('2019-06-30', 23, 8),
        ('2019-07-01', 23, 9), ('2020-01-01',23,5003),('2020-01-01',23,30340), ('2020-01-02',23,450),
        ('2020-01-02',23,4500), ('2020-04-01', 23, 10), ('2020-04-01', 23, 20), ('2020-04-02', 23, 30),
        ('2020-04-15', 23, 40), ('2020-04-16', 23, 50), ('2020-04-17', 23, 60), ('2020-05-01', 23, 70),
        ('2020-06-30', 23, 80), ('2020-07-01', 23, 90), ('2021-01-01',23,5675), ('2021-01-01',23,1030),
        ('2021-01-01',23,564), ('2021-01-01',23,345), ('2021-04-01', 23, -10), ('2021-04-01', 23, -20),
        ('2021-04-02', 23, -30), ('2021-04-15', 23, -40)]

计算 year_quarter 列并缓存结果:

df = spark.createDataFrame(data=data, schema = ["yyyy_mm_dd", "id1", "cost"]) \
    .withColumn("yyyy_mm_dd", F.to_date("yyyy_mm_dd", "yyyy-MM-dd")) \
    .withColumn('year_quarter', (F.year(F.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd')))) \
    .cache()

获取最大日期及其对应的季度:

max_row = df.selectExpr("max(yyyy_mm_dd)", "max_by(year_quarter, yyyy_mm_dd)").head()
cur_date, cur_quarter = max_row[0], max_row[1]

严格来说没有必要设置 cur_date 到数据的最长日期。相反 cur_date 以及 cur_quarter 也可以手动设置。
对于所有方面,除了当前一个应用问题中给出的逻辑:

w = Window.partitionBy(F.col('id1'), F.expr('substring(string(year_quarter), -2)')).orderBy('year_quarter')
df_full_quarters = df.filter(f"year_quarter <> {cur_quarter}") \
    .groupby('id1', 'year_quarter') \
    .agg(F.sum('cost').alias('cost')) \
    .withColumn('prev_value', F.lag(F.col('cost')).over(w))

对于当前季度,过滤掉上一年中所有应忽略的日期:

df_cur_quarter = df.filter(f"year_quarter = {cur_quarter} or (year_quarter = {cur_quarter - 100} and add_months(yyyy_mm_dd, 12) <= '{cur_date}')") \
    .groupby('id1', 'year_quarter') \
    .agg(F.sum('cost').alias('cost')) \
    .withColumn('prev_value', F.lag(F.col('cost')).over(w)) \
    .filter(f"year_quarter = {cur_quarter}")

最后将这两部分结合起来,计算出 diff 列:

growth = df_full_quarters.union(df_cur_quarter) \
    .withColumn('diff', F.when(F.isnull(F.col('cost') - F.col('prev_value')), 0).otherwise(F.col('cost') - F.col('prev_value'))) \
    .orderBy("id1", "year_quarter")

结果将是:

+---+------------+-----+----------+------+                                      
|id1|year_quarter| cost|prev_value|  diff|
+---+------------+-----+----------+------+
| 23|      201902|   36|      null|     0|
| 23|      201903|    9|      null|     0|
| 23|      202001|40293|      null|     0|
| 23|      202002|  360|        36|   324|
| 23|      202003|   90|         9|    81|
| 23|      202101| 7614|     40293|-32679|
| 23|      202102| -100|       100|  -200|
+---+------------+-----+----------+------+

在本例中,对于2021q2与上一年的比较,2020q2的总和为100,但整个2020q2的实际值为360。

相关问题