对于每个日期x,对于z列中缺少的每个唯一值y,创建一行,其中date=x,z=y的最新值

l2osamch  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(325)

我有一个Pypark数据框 Date 什么东西 Stock 以特定的方式 Size . 表的复合键是[date,size]。考虑到物品已经 n 尺寸,每个日期0到 n 行可以存在。

  1. input = spark.createDataFrame([
  2. # Day 1: Row for all sizes
  3. [1, 1, 10],
  4. [1, 2, 10],
  5. [1, 3, 10],
  6. # Day 2: Row for one size
  7. [2, 1, 8],
  8. # Day 3: Row for no size
  9. # Day 4: Row for two sizes
  10. [4, 1, 7],
  11. [4, 2, 9],
  12. ], ["Date", "Size", "Stock"])

例如,在第二天,总共售出了两件1号的商品,将库存从10件减少到8件。当天没有1号或3号的交易。
我想计算一下 Stock 该项目的每个 Sizes 在每个 Date . 预期输出如下所示:

  1. expected = spark.createDataFrame([
  2. # Day 1
  3. [1, 1, 10],
  4. [1, 2, 10],
  5. [1, 3, 10],
  6. # Day 2
  7. [2, 1, 8],
  8. [2, 2, 10],
  9. [2, 3, 10],
  10. # Day 3
  11. [3, 1, 8],
  12. [3, 2, 10],
  13. [3, 3, 10],
  14. # Day 4
  15. [4, 1, 7],
  16. [4, 2, 9],
  17. [4, 3, 10],
  18. ], ["Date", "Size", "Stock"])

我怎样才能做到这一点?

eqoofvh9

eqoofvh91#

其思想是从最小和最大日期生成一系列日期,交叉连接以获得日期和大小的组合列表,左连接到原始Dataframe,并使用 lastignoreNulls 设置为 True .

  1. from pyspark.sql import functions as F, Window
  2. df = input.agg(F.expr('sequence(min(Date), max(Date)) as Date')).select(F.explode('Date').alias('Date'))
  3. result = df.crossJoin(
  4. input.select('Size').distinct().repartition(10)
  5. ).join(
  6. input,
  7. ['Date', 'Size'],
  8. 'left'
  9. ).withColumn(
  10. 'Stock',
  11. F.last('Stock', True).over(Window.partitionBy('Size').orderBy('Date'))
  12. )
  13. result.orderBy('Date', 'Size').show()
  14. +----+----+-----+
  15. |Date|Size|Stock|
  16. +----+----+-----+
  17. | 1| 1| 10|
  18. | 1| 2| 10|
  19. | 1| 3| 10|
  20. | 2| 1| 8|
  21. | 2| 2| 10|
  22. | 2| 3| 10|
  23. | 3| 1| 8|
  24. | 3| 2| 10|
  25. | 3| 3| 10|
  26. | 4| 1| 7|
  27. | 4| 2| 9|
  28. | 4| 3| 10|
  29. +----+----+-----+
展开查看全部

相关问题