对于每个日期x,对于z列中缺少的每个唯一值y,创建一行,其中date=x,z=y的最新值

l2osamch  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(290)

我有一个Pypark数据框 Date 什么东西 Stock 以特定的方式 Size . 表的复合键是[date,size]。考虑到物品已经 n 尺寸,每个日期0到 n 行可以存在。

input = spark.createDataFrame([
    # Day 1: Row for all sizes
    [1, 1, 10],
    [1, 2, 10],
    [1, 3, 10],
    # Day 2: Row for one size
    [2, 1, 8],
    # Day 3: Row for no size
    # Day 4: Row for two sizes
    [4, 1, 7],
    [4, 2, 9],
], ["Date", "Size", "Stock"])

例如,在第二天,总共售出了两件1号的商品,将库存从10件减少到8件。当天没有1号或3号的交易。
我想计算一下 Stock 该项目的每个 Sizes 在每个 Date . 预期输出如下所示:

expected = spark.createDataFrame([
    # Day 1
    [1, 1, 10],
    [1, 2, 10],
    [1, 3, 10],
    # Day 2
    [2, 1, 8],
    [2, 2, 10],
    [2, 3, 10],
    # Day 3
    [3, 1, 8],
    [3, 2, 10],
    [3, 3, 10],
    # Day 4
    [4, 1, 7],
    [4, 2, 9],
    [4, 3, 10],
], ["Date", "Size", "Stock"])

我怎样才能做到这一点?

eqoofvh9

eqoofvh91#

其思想是从最小和最大日期生成一系列日期,交叉连接以获得日期和大小的组合列表,左连接到原始Dataframe,并使用 lastignoreNulls 设置为 True .

from pyspark.sql import functions as F, Window

df = input.agg(F.expr('sequence(min(Date), max(Date)) as Date')).select(F.explode('Date').alias('Date'))

result = df.crossJoin(
    input.select('Size').distinct().repartition(10)
).join(
    input, 
    ['Date', 'Size'], 
    'left'
).withColumn(
    'Stock', 
    F.last('Stock', True).over(Window.partitionBy('Size').orderBy('Date'))
)

result.orderBy('Date', 'Size').show()
+----+----+-----+
|Date|Size|Stock|
+----+----+-----+
|   1|   1|   10|
|   1|   2|   10|
|   1|   3|   10|
|   2|   1|    8|
|   2|   2|   10|
|   2|   3|   10|
|   3|   1|    8|
|   3|   2|   10|
|   3|   3|   10|
|   4|   1|    7|
|   4|   2|    9|
|   4|   3|   10|
+----+----+-----+

相关问题