我有一个pyspark数据框架,它可以跟踪几个月内产品价格和状态的变化。这意味着只有当与上月相比发生了变化(状态或价格)时,才会创建新行,如下面的虚拟数据中所示
----------------------------------------
|product_id| status | price| month |
----------------------------------------
|1 | available | 5 | 2019-10|
----------------------------------------
|1 | available | 8 | 2020-08|
----------------------------------------
|1 | limited | 8 | 2020-10|
----------------------------------------
|2 | limited | 1 | 2020-09|
----------------------------------------
|2 | limited | 3 | 2020-10|
----------------------------------------
我想创建一个数据框,显示过去6个月的值。这意味着每当上面的Dataframe中有间隙时,我就需要复制记录。例如,如果最近6个月是2020-07、2020-08。。。2020-12,则上述Dataframe的结果应为
----------------------------------------
|product_id| status | price| month |
----------------------------------------
|1 | available | 5 | 2020-07|
----------------------------------------
|1 | available | 8 | 2020-08|
----------------------------------------
|1 | available | 8 | 2020-09|
----------------------------------------
|1 | limited | 8 | 2020-10|
----------------------------------------
|1 | limited | 8 | 2020-11|
----------------------------------------
|1 | limited | 8 | 2020-12|
----------------------------------------
|2 | limited | 1 | 2020-09|
----------------------------------------
|2 | limited | 3 | 2020-10|
----------------------------------------
|2 | limited | 3 | 2020-11|
----------------------------------------
|2 | limited | 3 | 2020-12|
----------------------------------------
请注意,对于产品\u id=1,有一个2019-10年的较旧记录,该记录被传播到2020-08年,然后被修剪,而对于产品\u id=2,在2020-09年之前没有记录,因此2020-07、2020-08月份没有为其填写(因为该产品在2020-09年之前不存在)。
由于Dataframe由数百万条记录组成,使用for循环和检查每个产品id的“暴力”解决方案相当慢。似乎应该可以使用窗口函数来解决这个问题,下个月再创建一个列,然后根据该列填补空白,但我不知道如何实现这一点。
2条答案
按热度按时间kx5bkwkv1#
关于@jxc注解,我已经为这个用例准备了答案。
下面是代码片段。
导入spark sql函数
from pyspark.sql import functions as F, Window
准备样本数据创建示例数据的dataframe
df = spark.createDataFrame(data = simpleData, schema = columns)
在dataframe中添加date列以获得正确的格式化日期创建winspec w1并使用窗口聚合函数lead查找下一个日期(w1),将其转换为前几个月以设置日期序列:
使用months#between(end#date,date)计算两个日期之间的#个月,并使用transform函数迭代序列(0,#months),创建一个命名的#结构,date=add#months(date,i)和price=if(i=0,price,price),使用inline#outer分解结构数组。
对上的Dataframe进行分区
product_id
以及在df3
获取每行的行号。然后,存储rank
包含新列的列值max_rank
对于每个product_id
储存max_rank
加入df4
```w2 = Window.partitionBy('product_id').orderBy('date')
df3 = df2.withColumn('rank',F.row_number().over(w2))
Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int]
df3.show()
+----------+---------+----------+-----+----+
|product_id| status| date|price|rank|
+----------+---------+----------+-----+----+
| 1|Available|2020-07-01| 5| 1|
| 1|Available|2020-08-01| 8| 2|
| 1|Available|2020-09-01| 8| 3|
| 1|Available|2020-10-01| 8| 4|
| 1|Available|2020-11-01| 8| 5|
| 1| Limited|2020-12-01| 8| 6|
| 2| Limited|2020-09-01| 1| 1|
| 2| Limited|2020-10-01| 1| 2|
| 2| Limited|2020-11-01| 1| 3|
| 2| Limited|2020-12-01| 3| 4|
+----------+---------+----------+-----+----+
最后过滤
df5
Dataframe使用between
函数获取最近6个月的数据。hfwmuf9z2#
使用spark sql:
给定输入Dataframe:
过滤日期窗口,即从2020-07到2020-12的6个月,并将其存储在df1中
下边界-当月份<=“2020-07”时获得最大值。将月份改写为“2020-07”
上限-使用<='2020-12'获得最大值。将月份改写为“2020-12”
现在将所有3个合并并存储在df4中
结果:使用序列(date1,date2,interval 1 month)为缺少的月份生成日期数组。分解数组得到结果。