用零填充缺失的销售值,并在pyspark中计算3个月的平均值

bkhjykvo  于 2021-05-19  发布在  Spark
关注(0)|答案(2)|浏览(536)

我想加上零销售额缺失值,并计算3个月平均Pypark

  1. My Input :
  2. product specialty date sales
  3. A pharma 1/3/2019 50
  4. A pharma 1/4/2019 60
  5. A pharma 1/5/2019 70
  6. A pharma 1/8/2019 80
  7. A ENT 1/8/2019 50
  8. A ENT 1/9/2019 65
  9. A ENT 1/11/2019 40
  10. my output:
  11. product specialty date sales 3month_avg_sales
  12. A pharma 1/3/2019 50 16.67
  13. A pharma 1/4/2019 60 36.67
  14. A pharma 1/5/2019 70 60
  15. A pharma 1/6/2019 0 43.33
  16. A pharma 1/7/2019 0 23.33
  17. A pharma 1/8/2019 80 26.67
  18. A ENT 1/8/2019 50 16.67
  19. A ENT 1/9/2019 65 38.33
  20. A ENT 1/10/2019 0 38.33
  21. A ENT 1/11/2019 40 35
  22. row = Row("Product", "specialty","Date", "Sales")
  23. df = sc.parallelize([row("A","pharma", "1/3/2019", 50),row("A","pharma", "1/4/2019", 60),row("A", "pharma","01/05/2019", 70),row("A","pharma", "1/8/2019", 80),row("A","ENT", "1/8/2019", 50),row("A","ENT", "1/9/2019", 65),row("A","ENT", "1/11/2019", 40)]).toDF()
  24. w = Window.partitionBy("product","specialty).orderBy("date")
  25. df.withColumn("new_data_date", expr("add_months(data_date, 1)"))
  26. df.withcolumn("sales",F.where(col("date") isin col("new_data_date")
  27. df=df.withColumn('index', (year('Date') - 2020) * 12 + month('Date')).withColumn('avg',sum('Sales').over(w) / 3)

我惊讶地加上任何一个日期价值是错过与销售价值为零。计算3个月平均值。

guz6ccqo

guz6ccqo1#

您可以使用sparksql内置函数transform+sequence来创建缺少的月份,并将其sales设置为0,使用window aggregate函数来计算所需的月份 end_date 以及最后3个月的平均销售额。下面我将代码分为三个步骤进行说明,您可以根据自己的需求合并它们。
注意:假设每个不同的月份最多有一条记录,并且所有的日期值都是day=1,否则使用 F.trunc(F.to_date('date', 'd/M/yyyy'), "month") 和/或定义重复条目的逻辑。

  1. from pyspark.sql import functions as F, Window
  2. df = spark.createDataFrame([
  3. ('A', 'pharma', '1/3/2019', 50), ('A', 'pharma', '1/4/2019', 60),
  4. ('A', 'pharma', '1/5/2019', 70), ('A', 'pharma', '1/8/2019', 80),
  5. ('A', 'ENT', '1/8/2019', 50), ('A', 'ENT', '1/9/2019', 65),
  6. ('A', 'ENT', '1/11/2019', 40)
  7. ], ['product', 'specialty', 'date', 'sales'])
  8. df = df.withColumn('date', F.to_date('date', 'd/M/yyyy'))

步骤1:设置winspec w1 并使用窗口聚合函数lead查找下一个日期(w1),将其转换为前几个月,以设置日期序列:

  1. w1 = Window.partitionBy('product', 'specialty').orderBy('date')
  2. df1 = df.withColumn('end_date', F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
  3. +-------+---------+----------+-----+----------+
  4. |product|specialty| date|sales| end_date|
  5. +-------+---------+----------+-----+----------+
  6. | A| ENT|2019-08-01| 50|2019-08-01|
  7. | A| ENT|2019-09-01| 65|2019-10-01|
  8. | A| ENT|2019-11-01| 40|2019-11-01|
  9. | A| pharma|2019-03-01| 50|2019-03-01|
  10. | A| pharma|2019-04-01| 60|2019-04-01|
  11. | A| pharma|2019-05-01| 70|2019-07-01|
  12. | A| pharma|2019-08-01| 80|2019-08-01|
  13. +-------+---------+----------+-----+----------+

第二步:使用 months_between(end_date, date) 计算两个日期之间的月数,并使用转换函数迭代 sequence(0, #months) ,创建带有日期的命名结构= add_months(date,i) 和销售= IF(i=0,sales,0) ,使用inline\u outer分解结构数组:

  1. df2 = df1.selectExpr("product", "specialty", """
  2. inline_outer(
  3. transform(
  4. sequence(0,int(months_between(end_date, date))),
  5. i -> (add_months(date,i) as date, IF(i=0,sales,0) as sales)
  6. )
  7. )
  8. """)
  9. +-------+---------+----------+-----+
  10. |product|specialty| date|sales|
  11. +-------+---------+----------+-----+
  12. | A| ENT|2019-08-01| 50|
  13. | A| ENT|2019-09-01| 65|
  14. | A| ENT|2019-10-01| 0|
  15. | A| ENT|2019-11-01| 40|
  16. | A| pharma|2019-03-01| 50|
  17. | A| pharma|2019-04-01| 60|
  18. | A| pharma|2019-05-01| 70|
  19. | A| pharma|2019-06-01| 0|
  20. | A| pharma|2019-07-01| 0|
  21. | A| pharma|2019-08-01| 80|
  22. +-------+---------+----------+-----+

步骤3:使用以下winspec w2 以及计算平均值的聚合函数:

  1. N = 3
  2. w2 = Window.partitionBy('product', 'specialty').orderBy('date').rowsBetween(-N+1,0)
  3. df_new = df2.select("*", F.round(F.sum('sales').over(w2)/N,2).alias(f'{N}month_avg_sales'))
  4. +-------+---------+----------+-----+----------------+
  5. |product|specialty| date|sales|3month_avg_sales|
  6. +-------+---------+----------+-----+----------------+
  7. | A| ENT|2019-08-01| 50| 16.67|
  8. | A| ENT|2019-09-01| 65| 38.33|
  9. | A| ENT|2019-10-01| 0| 38.33|
  10. | A| ENT|2019-11-01| 40| 35.0|
  11. | A| pharma|2019-03-01| 50| 16.67|
  12. | A| pharma|2019-04-01| 60| 36.67|
  13. | A| pharma|2019-05-01| 70| 60.0|
  14. | A| pharma|2019-06-01| 0| 43.33|
  15. | A| pharma|2019-07-01| 0| 23.33|
  16. | A| pharma|2019-08-01| 80| 26.67|
  17. +-------+---------+----------+-----+----------------+
展开查看全部
lbsnaicq

lbsnaicq2#

对于缺失的值,你可以这样做

  1. df.fillna(0, subset=['sales'])

对于3个月的平均值,您可以在这里找到一个很好的答案,只需小心正确解析时间戳并将窗口起始日更改为-90
更新
这段代码应该可以完成你想要的工作

  1. days = lambda i: i * 86400
  2. w = (Window.orderBy(f.col("timestampGMT").cast('long')).rangeBetween(-days(90), 0))
  3. missings_df = sparkSession.createDataFrame([ ('A', 'pharma', '1/6/2019', 0)], ['product', 'specialty', 'date', 'sales'])
  4. df = (df
  5. .union(missings_df) # adding missing row
  6. .withColumn('timestampGMT', f.to_date('date', 'd/M/yyyy').cast('timestamp')) # cast to timestamp
  7. .withColumn('rolling_average', f.avg("sales").over(w)) # rolling average on 90 days
  8. )
展开查看全部

相关问题