我想加上零销售额缺失值,并计算3个月平均Pypark
My Input :
product specialty date sales
A pharma 1/3/2019 50
A pharma 1/4/2019 60
A pharma 1/5/2019 70
A pharma 1/8/2019 80
A ENT 1/8/2019 50
A ENT 1/9/2019 65
A ENT 1/11/2019 40
my output:
product specialty date sales 3month_avg_sales
A pharma 1/3/2019 50 16.67
A pharma 1/4/2019 60 36.67
A pharma 1/5/2019 70 60
A pharma 1/6/2019 0 43.33
A pharma 1/7/2019 0 23.33
A pharma 1/8/2019 80 26.67
A ENT 1/8/2019 50 16.67
A ENT 1/9/2019 65 38.33
A ENT 1/10/2019 0 38.33
A ENT 1/11/2019 40 35
row = Row("Product", "specialty","Date", "Sales")
df = sc.parallelize([row("A","pharma", "1/3/2019", 50),row("A","pharma", "1/4/2019", 60),row("A", "pharma","01/05/2019", 70),row("A","pharma", "1/8/2019", 80),row("A","ENT", "1/8/2019", 50),row("A","ENT", "1/9/2019", 65),row("A","ENT", "1/11/2019", 40)]).toDF()
w = Window.partitionBy("product","specialty).orderBy("date")
df.withColumn("new_data_date", expr("add_months(data_date, 1)"))
df.withcolumn("sales",F.where(col("date") isin col("new_data_date")
df=df.withColumn('index', (year('Date') - 2020) * 12 + month('Date')).withColumn('avg',sum('Sales').over(w) / 3)
我惊讶地加上任何一个日期价值是错过与销售价值为零。计算3个月平均值。
2条答案
按热度按时间guz6ccqo1#
您可以使用sparksql内置函数transform+sequence来创建缺少的月份,并将其sales设置为0,使用window aggregate函数来计算所需的月份
end_date
以及最后3个月的平均销售额。下面我将代码分为三个步骤进行说明,您可以根据自己的需求合并它们。注意:假设每个不同的月份最多有一条记录,并且所有的日期值都是day=1,否则使用
F.trunc(F.to_date('date', 'd/M/yyyy'), "month")
和/或定义重复条目的逻辑。步骤1:设置winspec
w1
并使用窗口聚合函数lead查找下一个日期(w1),将其转换为前几个月,以设置日期序列:第二步:使用
months_between(end_date, date)
计算两个日期之间的月数,并使用转换函数迭代sequence(0, #months)
,创建带有日期的命名结构=add_months(date,i)
和销售=IF(i=0,sales,0)
,使用inline\u outer分解结构数组:步骤3:使用以下winspec
w2
以及计算平均值的聚合函数:lbsnaicq2#
对于缺失的值,你可以这样做
对于3个月的平均值,您可以在这里找到一个很好的答案,只需小心正确解析时间戳并将窗口起始日更改为-90
更新
这段代码应该可以完成你想要的工作