用零填充缺失的销售值,并在pyspark中计算3个月的平均值

bkhjykvo  于 2021-05-19  发布在  Spark
关注(0)|答案(2)|浏览(508)

我想加上零销售额缺失值,并计算3个月平均Pypark

My Input :
  product    specialty    date       sales
  A           pharma      1/3/2019    50
  A           pharma      1/4/2019    60
  A           pharma      1/5/2019    70
  A           pharma      1/8/2019    80
  A           ENT         1/8/2019    50
  A           ENT         1/9/2019    65
  A           ENT         1/11/2019   40

my output:
   product    specialty    date       sales    3month_avg_sales
   A           pharma      1/3/2019    50       16.67
   A           pharma      1/4/2019    60       36.67
   A           pharma      1/5/2019    70        60
   A           pharma      1/6/2019     0        43.33
   A           pharma      1/7/2019     0        23.33
   A           pharma      1/8/2019    80        26.67
   A           ENT         1/8/2019    50        16.67
   A           ENT         1/9/2019    65        38.33
   A           ENT         1/10/2019    0        38.33  
   A           ENT         1/11/2019   40        35

row = Row("Product", "specialty","Date", "Sales")
df = sc.parallelize([row("A","pharma", "1/3/2019", 50),row("A","pharma", "1/4/2019", 60),row("A", "pharma","01/05/2019", 70),row("A","pharma", "1/8/2019", 80),row("A","ENT", "1/8/2019", 50),row("A","ENT", "1/9/2019", 65),row("A","ENT", "1/11/2019", 40)]).toDF()
w = Window.partitionBy("product","specialty).orderBy("date")
df.withColumn("new_data_date", expr("add_months(data_date, 1)"))
df.withcolumn("sales",F.where(col("date") isin col("new_data_date")
 df=df.withColumn('index', (year('Date') - 2020) * 12 + month('Date')).withColumn('avg',sum('Sales').over(w) / 3)

我惊讶地加上任何一个日期价值是错过与销售价值为零。计算3个月平均值。

guz6ccqo

guz6ccqo1#

您可以使用sparksql内置函数transform+sequence来创建缺少的月份,并将其sales设置为0,使用window aggregate函数来计算所需的月份 end_date 以及最后3个月的平均销售额。下面我将代码分为三个步骤进行说明,您可以根据自己的需求合并它们。
注意:假设每个不同的月份最多有一条记录,并且所有的日期值都是day=1,否则使用 F.trunc(F.to_date('date', 'd/M/yyyy'), "month") 和/或定义重复条目的逻辑。

from pyspark.sql import functions as F, Window

df = spark.createDataFrame([
    ('A', 'pharma', '1/3/2019', 50), ('A', 'pharma', '1/4/2019', 60), 
    ('A', 'pharma', '1/5/2019', 70), ('A', 'pharma', '1/8/2019', 80), 
    ('A', 'ENT', '1/8/2019', 50), ('A', 'ENT', '1/9/2019', 65),
    ('A', 'ENT', '1/11/2019', 40)
], ['product', 'specialty', 'date', 'sales'])

df = df.withColumn('date', F.to_date('date', 'd/M/yyyy'))

步骤1:设置winspec w1 并使用窗口聚合函数lead查找下一个日期(w1),将其转换为前几个月,以设置日期序列:

w1 = Window.partitionBy('product', 'specialty').orderBy('date')

df1 = df.withColumn('end_date', F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
+-------+---------+----------+-----+----------+
|product|specialty|      date|sales|  end_date|
+-------+---------+----------+-----+----------+
|      A|      ENT|2019-08-01|   50|2019-08-01|
|      A|      ENT|2019-09-01|   65|2019-10-01|
|      A|      ENT|2019-11-01|   40|2019-11-01|
|      A|   pharma|2019-03-01|   50|2019-03-01|
|      A|   pharma|2019-04-01|   60|2019-04-01|
|      A|   pharma|2019-05-01|   70|2019-07-01|
|      A|   pharma|2019-08-01|   80|2019-08-01|
+-------+---------+----------+-----+----------+

第二步:使用 months_between(end_date, date) 计算两个日期之间的月数,并使用转换函数迭代 sequence(0, #months) ,创建带有日期的命名结构= add_months(date,i) 和销售= IF(i=0,sales,0) ,使用inline\u outer分解结构数组:

df2 = df1.selectExpr("product", "specialty", """
       inline_outer(
         transform(
            sequence(0,int(months_between(end_date, date))),
            i -> (add_months(date,i) as date, IF(i=0,sales,0) as sales)
         )
       )
   """)
+-------+---------+----------+-----+
|product|specialty|      date|sales|
+-------+---------+----------+-----+
|      A|      ENT|2019-08-01|   50|
|      A|      ENT|2019-09-01|   65|
|      A|      ENT|2019-10-01|    0|
|      A|      ENT|2019-11-01|   40|
|      A|   pharma|2019-03-01|   50|
|      A|   pharma|2019-04-01|   60|
|      A|   pharma|2019-05-01|   70|
|      A|   pharma|2019-06-01|    0|
|      A|   pharma|2019-07-01|    0|
|      A|   pharma|2019-08-01|   80|
+-------+---------+----------+-----+

步骤3:使用以下winspec w2 以及计算平均值的聚合函数:

N = 3

w2 = Window.partitionBy('product', 'specialty').orderBy('date').rowsBetween(-N+1,0)

df_new = df2.select("*", F.round(F.sum('sales').over(w2)/N,2).alias(f'{N}month_avg_sales'))
+-------+---------+----------+-----+----------------+
|product|specialty|      date|sales|3month_avg_sales|
+-------+---------+----------+-----+----------------+
|      A|      ENT|2019-08-01|   50|           16.67|
|      A|      ENT|2019-09-01|   65|           38.33|
|      A|      ENT|2019-10-01|    0|           38.33|
|      A|      ENT|2019-11-01|   40|            35.0|
|      A|   pharma|2019-03-01|   50|           16.67|
|      A|   pharma|2019-04-01|   60|           36.67|
|      A|   pharma|2019-05-01|   70|            60.0|
|      A|   pharma|2019-06-01|    0|           43.33|
|      A|   pharma|2019-07-01|    0|           23.33|
|      A|   pharma|2019-08-01|   80|           26.67|
+-------+---------+----------+-----+----------------+
lbsnaicq

lbsnaicq2#

对于缺失的值,你可以这样做

df.fillna(0, subset=['sales'])

对于3个月的平均值,您可以在这里找到一个很好的答案,只需小心正确解析时间戳并将窗口起始日更改为-90
更新
这段代码应该可以完成你想要的工作

days = lambda i: i * 86400
w = (Window.orderBy(f.col("timestampGMT").cast('long')).rangeBetween(-days(90), 0))

missings_df = sparkSession.createDataFrame([ ('A', 'pharma', '1/6/2019', 0)], ['product', 'specialty', 'date', 'sales'])

df = (df
      .union(missings_df) # adding missing row
      .withColumn('timestampGMT', f.to_date('date', 'd/M/yyyy').cast('timestamp')) # cast to timestamp
      .withColumn('rolling_average', f.avg("sales").over(w)) # rolling average on 90 days
     )

相关问题