pysparkDataframe对所有列进行前向填充

30byixjq  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(486)

我有以下问题。我有一个数据集来跟踪状态的变化。

  1. id valid eventdate
  2. 1 False 2020-05-01
  3. 1 True 2020-05-06
  4. 2 True 2020-05-04
  5. 2 False 2020-05-07
  6. 2 True 2020-05-09
  7. 3 False 2020-05-11

目标:

  1. SELECT valid FROM table WHERE id = 1 AND eventdate = "2020-05-05"

我需要知道在任何给定的日期(在开始和今天)的状态是一个给定的一天。例如,例如 id 1有效期仍然有效 False 五月五日。
在Pandas中,我有一个这样的解决方案,我使用 pivot 以及 ffill 用于填充空值。我使用一个melt将它重新制作成一个三列的Dataframe。

  1. from datetime import datetime
  2. import pandas as pd
  3. test_data = [
  4. [1,"False","2020-05-01"],
  5. [1,"True","2020-05-06"],
  6. [2,"True","2020-05-04"],
  7. [2,"False","2020-05-07"],
  8. [2,"True","2020-05-09"],
  9. [3,"False","2020-05-11"]
  10. ]
  11. # Create inputframe
  12. df = pd.DataFrame(test_data, columns=['id', 'valid', 'eventdate'])
  13. df['id'] = df['id'].astype(str)
  14. df['valid'] = df['valid'] == "True"
  15. df['eventdate'] = pd.to_datetime(df['eventdate'])
  16. print(df.head(6))
  17. # id valid eventdate
  18. # 0 1 False 2020-05-01
  19. # 1 1 True 2020-05-06
  20. # 2 2 True 2020-05-04
  21. # 3 2 False 2020-05-07
  22. # 4 2 True 2020-05-09
  23. # 5 3 False 2020-05-11
  24. # Create full time range as frame
  25. timeframe = pd.date_range(start=min(df['eventdate']),
  26. end=datetime.now().date()).to_frame().reset_index(drop=True).rename(columns={0: 'eventdate'})
  27. print(timeframe.head())
  28. # eventdate
  29. # 0 2020-05-01
  30. # 1 2020-05-02
  31. # 2 2020-05-03
  32. # 3 2020-05-04
  33. # 4 2020-05-05
  34. # Merge timeframe into original frame
  35. df = df.merge(timeframe,
  36. left_on='eventdate',
  37. right_on='eventdate',
  38. how='right')
  39. print(df.sort_values('eventdate').head())
  40. # id valid eventdate
  41. # 0 1 False 2020-05-01
  42. # 6 NaN NaN 2020-05-02
  43. # 7 NaN NaN 2020-05-03
  44. # 2 2 True 2020-05-04
  45. # 8 NaN NaN 2020-05-05
  46. # 1. Pivot to get dates on rows and ids as columns
  47. # 2. Forward fill values per id
  48. # 3. Fill remaining NaNs with False
  49. df = df.pivot(index='eventdate',
  50. columns='id',
  51. values='valid')\
  52. .fillna(method='ffill')\
  53. .fillna(False)
  54. print(df.head())
  55. # id NaN 1 2 3
  56. # eventdate
  57. # 2020-05-01 False False False False
  58. # 2020-05-02 False False False False
  59. # 2020-05-03 False False False False
  60. # 2020-05-04 False False True False
  61. # 2020-05-05 False False True False
  62. # Drop NaN column and reset the index
  63. df = df.loc[:, df.columns.notnull()].reset_index()
  64. # Melt the columns back
  65. out = pd.melt(df,
  66. id_vars='eventdate',
  67. value_name='valid')
  68. print(out.head(10))
  69. # eventdate id valid
  70. # 0 2020-05-01 1 False
  71. # 1 2020-05-02 1 False
  72. # 2 2020-05-03 1 False
  73. # 3 2020-05-04 1 False
  74. # 4 2020-05-05 1 False
  75. # 5 2020-05-06 1 True
  76. # 6 2020-05-07 1 True
  77. # 7 2020-05-08 1 True
  78. # 8 2020-05-09 1 True
  79. # 9 2020-05-10 1 True

我正在努力实现同样的Spark,但向前填补不存在。我知道如何达到最新的地位 id :

  1. w = Window().partitionBy("id").orderBy(F.col("eventdate").desc())
  2. df.withColumn("rn", F.row_number().over(w)) \
  3. .where(F.col("rn") == 1) \
  4. .selectExpr("id", "valid", "eventdate AS last_change") \
  5. .dropna() \
  6. .show()

旋转可通过以下方式完成:

  1. df\
  2. .select(["id", "valid", "eventdate"])\
  3. .groupBy(["eventdate"])\
  4. .pivot("id")\
  5. .agg(F.min("valid"))\
  6. .drop('null')\
  7. .sort('eventdate')\
  8. .show()

为了进行正向填充,我将数据集限制为一个 id :

  1. import sys
  2. from datetime import datetime
  3. import pyspark.sql.functions as F
  4. from pyspark.sql import Window
  5. test_data = [
  6. [1,"False","2020-05-01"],
  7. [1,"True","2020-05-06"],
  8. [2,"True","2020-05-04"],
  9. [2,"False","2020-05-07"],
  10. [2,"True","2020-05-09"],
  11. [3,"False","2020-05-11"]
  12. ]
  13. # Create dataframe
  14. df = sc\
  15. .parallelize(test_data)\
  16. .toDF(("id", "valid", "eventdate"))\
  17. .withColumn("eventdate", F.to_date(F.to_timestamp("eventdate")))\
  18. .withColumn("valid", F.when(F.col("valid") == "True", 1).otherwise(0))
  19. df.createOrReplaceTempView("df")
  20. # Create event frame
  21. event_dates = spark.sql("SELECT sequence(min(eventdate), CURRENT_DATE(), interval 1 day) as eventdate FROM df")\
  22. .withColumn("eventdate",
  23. F.explode(F.col("eventdate")))
  24. # Join dates and data
  25. df = df.join(event_dates, on='eventdate', how='right')
  26. df2 = df.where(df.id == 1)\
  27. .join(event_dates, on='eventdate', how='right')\
  28. .withColumn('id', F.lit(1))
  29. # df2.sort('eventdate').show()
  30. # +----------+---+-----+
  31. # | eventdate| id|valid|
  32. # +----------+---+-----+
  33. # |2020-05-01| 1| 0|
  34. # |2020-05-02| 1| null|
  35. # |2020-05-03| 1| null|
  36. # |2020-05-04| 1| null|
  37. # |2020-05-05| 1| null|
  38. # |2020-05-06| 1| 1|
  39. # |2020-05-07| 1| null|
  40. # |2020-05-08| 1| null|
  41. # |2020-05-09| 1| null|
  42. # |2020-05-10| 1| null|
  43. # |2020-05-11| 1| null|
  44. # |2020-05-12| 1| null|
  45. # |2020-05-13| 1| null|
  46. # +----------+---+-----+
  47. # Forward fill
  48. window = Window.partitionBy('id')\
  49. .orderBy('eventdate')\
  50. .rowsBetween(-sys.maxsize, 0)
  51. # Set filter
  52. read_last = F.last(df2['valid'], ignorenulls=True).over(window)
  53. df2.withColumn("ffill", read_last).show()
  54. # +----------+---+-----+-----+
  55. # | eventdate| id|valid|ffill|
  56. # +----------+---+-----+-----+
  57. # |2020-05-01| 1| 0| 0|
  58. # |2020-05-02| 1| null| 0|
  59. # |2020-05-03| 1| null| 0|
  60. # |2020-05-04| 1| null| 0|
  61. # |2020-05-05| 1| null| 0|
  62. # |2020-05-06| 1| 1| 1|
  63. # |2020-05-07| 1| null| 1|
  64. # |2020-05-08| 1| null| 1|
  65. # |2020-05-09| 1| null| 1|
  66. # |2020-05-10| 1| null| 1|
  67. # |2020-05-11| 1| null| 1|
  68. # |2020-05-12| 1| null| 1|
  69. # |2020-05-13| 1| null| 1|
  70. # +----------+---+-----+-----+

我认为第一件事是这个回答问题的方法是否正确。做 pivot 将创建一个包含少数列的长表,同时存储大量冗余数据。spark不是解决问题的合适工具,或者更好,问题本身不适合使用spark。我知道理想情况下,您需要使用并行处理,也许还需要广播 timeframe 并计算每个节点的正向填充 id 每个节点?
是否最好使用一些不同的方法,例如,存储 enddate 在查询时使用以下内容:

  1. id valid eventdate enddate
  2. 1 False 2020-05-01 2020-05-06
  3. 1 True 2020-05-06 2999-12-31
  4. 2 True 2020-05-04 2020-05-07
  5. 2 False 2020-05-07 2020-05-08
  6. 2 True 2020-05-09 2999-12-31
  7. 3 False 2020-05-11 2999-12-31

以及

  1. SELECT valid FROM table WHERE id = 1 AND "2020-05-05" between eventdate and enddate

请让我知道spark方法是否有用,对于这样一个稀疏的数据集,在任何给定的日历状态下查找状态的最佳方法是什么?
谢谢您。

ezykj2lf

ezykj2lf1#

为了 spark2.4+ 你可以用 sequence ,然后 explode 它需要向前填充。我还以为你的约会是这样的 yyyy-MM-dd ```
df.show() #sample dataframe

+---+-----+----------+

| id|valid| eventdate|

+---+-----+----------+

| 1|false|2020-05-01|

| 1| true|2020-05-06|

| 2| true|2020-05-04|

| 2|false|2020-05-07|

| 2| true|2020-05-09|

| 3|false|2020-05-11|

+---+-----+----------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("id").orderBy(F.to_date("eventdate","yyyy-MM-dd"))

df.withColumn("lead", F.lead("eventdate").over(w))
.withColumn("sequence", F.when(F.col("lead").isNotNull(),
F.expr("""sequence(to_date(eventdate),date_sub(to_date(lead),1), interval 1 day)"""))
.otherwise(F.array("eventdate")))
.select("id","valid",F.explode("sequence").alias("eventdate"))
.show(truncate=False)

+---+-----+----------+

|id |valid|eventdate |

+---+-----+----------+

|1 |false|2020-05-01|

|1 |false|2020-05-02|

|1 |false|2020-05-03|

|1 |false|2020-05-04|

|1 |false|2020-05-05|

|1 |true |2020-05-06|

|3 |false|2020-05-11|

|2 |true |2020-05-04|

|2 |true |2020-05-05|

|2 |true |2020-05-06|

|2 |false|2020-05-07|

|2 |false|2020-05-08|

|2 |true |2020-05-09|

+---+-----+----------+

展开查看全部

相关问题