我有以下问题。我有一个数据集来跟踪状态的变化。
id valid eventdate
1 False 2020-05-01
1 True 2020-05-06
2 True 2020-05-04
2 False 2020-05-07
2 True 2020-05-09
3 False 2020-05-11
目标:
SELECT valid FROM table WHERE id = 1 AND eventdate = "2020-05-05"
我需要知道在任何给定的日期(在开始和今天)的状态是一个给定的一天。例如,例如 id
1有效期仍然有效 False
五月五日。
在Pandas中,我有一个这样的解决方案,我使用 pivot
以及 ffill
用于填充空值。我使用一个melt将它重新制作成一个三列的Dataframe。
from datetime import datetime
import pandas as pd
test_data = [
[1,"False","2020-05-01"],
[1,"True","2020-05-06"],
[2,"True","2020-05-04"],
[2,"False","2020-05-07"],
[2,"True","2020-05-09"],
[3,"False","2020-05-11"]
]
# Create inputframe
df = pd.DataFrame(test_data, columns=['id', 'valid', 'eventdate'])
df['id'] = df['id'].astype(str)
df['valid'] = df['valid'] == "True"
df['eventdate'] = pd.to_datetime(df['eventdate'])
print(df.head(6))
# id valid eventdate
# 0 1 False 2020-05-01
# 1 1 True 2020-05-06
# 2 2 True 2020-05-04
# 3 2 False 2020-05-07
# 4 2 True 2020-05-09
# 5 3 False 2020-05-11
# Create full time range as frame
timeframe = pd.date_range(start=min(df['eventdate']),
end=datetime.now().date()).to_frame().reset_index(drop=True).rename(columns={0: 'eventdate'})
print(timeframe.head())
# eventdate
# 0 2020-05-01
# 1 2020-05-02
# 2 2020-05-03
# 3 2020-05-04
# 4 2020-05-05
# Merge timeframe into original frame
df = df.merge(timeframe,
left_on='eventdate',
right_on='eventdate',
how='right')
print(df.sort_values('eventdate').head())
# id valid eventdate
# 0 1 False 2020-05-01
# 6 NaN NaN 2020-05-02
# 7 NaN NaN 2020-05-03
# 2 2 True 2020-05-04
# 8 NaN NaN 2020-05-05
# 1. Pivot to get dates on rows and ids as columns
# 2. Forward fill values per id
# 3. Fill remaining NaNs with False
df = df.pivot(index='eventdate',
columns='id',
values='valid')\
.fillna(method='ffill')\
.fillna(False)
print(df.head())
# id NaN 1 2 3
# eventdate
# 2020-05-01 False False False False
# 2020-05-02 False False False False
# 2020-05-03 False False False False
# 2020-05-04 False False True False
# 2020-05-05 False False True False
# Drop NaN column and reset the index
df = df.loc[:, df.columns.notnull()].reset_index()
# Melt the columns back
out = pd.melt(df,
id_vars='eventdate',
value_name='valid')
print(out.head(10))
# eventdate id valid
# 0 2020-05-01 1 False
# 1 2020-05-02 1 False
# 2 2020-05-03 1 False
# 3 2020-05-04 1 False
# 4 2020-05-05 1 False
# 5 2020-05-06 1 True
# 6 2020-05-07 1 True
# 7 2020-05-08 1 True
# 8 2020-05-09 1 True
# 9 2020-05-10 1 True
我正在努力实现同样的Spark,但向前填补不存在。我知道如何达到最新的地位 id
:
w = Window().partitionBy("id").orderBy(F.col("eventdate").desc())
df.withColumn("rn", F.row_number().over(w)) \
.where(F.col("rn") == 1) \
.selectExpr("id", "valid", "eventdate AS last_change") \
.dropna() \
.show()
旋转可通过以下方式完成:
df\
.select(["id", "valid", "eventdate"])\
.groupBy(["eventdate"])\
.pivot("id")\
.agg(F.min("valid"))\
.drop('null')\
.sort('eventdate')\
.show()
为了进行正向填充,我将数据集限制为一个 id
:
import sys
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql import Window
test_data = [
[1,"False","2020-05-01"],
[1,"True","2020-05-06"],
[2,"True","2020-05-04"],
[2,"False","2020-05-07"],
[2,"True","2020-05-09"],
[3,"False","2020-05-11"]
]
# Create dataframe
df = sc\
.parallelize(test_data)\
.toDF(("id", "valid", "eventdate"))\
.withColumn("eventdate", F.to_date(F.to_timestamp("eventdate")))\
.withColumn("valid", F.when(F.col("valid") == "True", 1).otherwise(0))
df.createOrReplaceTempView("df")
# Create event frame
event_dates = spark.sql("SELECT sequence(min(eventdate), CURRENT_DATE(), interval 1 day) as eventdate FROM df")\
.withColumn("eventdate",
F.explode(F.col("eventdate")))
# Join dates and data
df = df.join(event_dates, on='eventdate', how='right')
df2 = df.where(df.id == 1)\
.join(event_dates, on='eventdate', how='right')\
.withColumn('id', F.lit(1))
# df2.sort('eventdate').show()
# +----------+---+-----+
# | eventdate| id|valid|
# +----------+---+-----+
# |2020-05-01| 1| 0|
# |2020-05-02| 1| null|
# |2020-05-03| 1| null|
# |2020-05-04| 1| null|
# |2020-05-05| 1| null|
# |2020-05-06| 1| 1|
# |2020-05-07| 1| null|
# |2020-05-08| 1| null|
# |2020-05-09| 1| null|
# |2020-05-10| 1| null|
# |2020-05-11| 1| null|
# |2020-05-12| 1| null|
# |2020-05-13| 1| null|
# +----------+---+-----+
# Forward fill
window = Window.partitionBy('id')\
.orderBy('eventdate')\
.rowsBetween(-sys.maxsize, 0)
# Set filter
read_last = F.last(df2['valid'], ignorenulls=True).over(window)
df2.withColumn("ffill", read_last).show()
# +----------+---+-----+-----+
# | eventdate| id|valid|ffill|
# +----------+---+-----+-----+
# |2020-05-01| 1| 0| 0|
# |2020-05-02| 1| null| 0|
# |2020-05-03| 1| null| 0|
# |2020-05-04| 1| null| 0|
# |2020-05-05| 1| null| 0|
# |2020-05-06| 1| 1| 1|
# |2020-05-07| 1| null| 1|
# |2020-05-08| 1| null| 1|
# |2020-05-09| 1| null| 1|
# |2020-05-10| 1| null| 1|
# |2020-05-11| 1| null| 1|
# |2020-05-12| 1| null| 1|
# |2020-05-13| 1| null| 1|
# +----------+---+-----+-----+
我认为第一件事是这个回答问题的方法是否正确。做 pivot
将创建一个包含少数列的长表,同时存储大量冗余数据。spark不是解决问题的合适工具,或者更好,问题本身不适合使用spark。我知道理想情况下,您需要使用并行处理,也许还需要广播 timeframe
并计算每个节点的正向填充 id
每个节点?
是否最好使用一些不同的方法,例如,存储 enddate
在查询时使用以下内容:
id valid eventdate enddate
1 False 2020-05-01 2020-05-06
1 True 2020-05-06 2999-12-31
2 True 2020-05-04 2020-05-07
2 False 2020-05-07 2020-05-08
2 True 2020-05-09 2999-12-31
3 False 2020-05-11 2999-12-31
以及
SELECT valid FROM table WHERE id = 1 AND "2020-05-05" between eventdate and enddate
请让我知道spark方法是否有用,对于这样一个稀疏的数据集,在任何给定的日历状态下查找状态的最佳方法是什么?
谢谢您。
1条答案
按热度按时间ezykj2lf1#
为了
spark2.4+
你可以用sequence
,然后explode
它需要向前填充。我还以为你的约会是这样的yyyy-MM-dd
```df.show() #sample dataframe
+---+-----+----------+
| id|valid| eventdate|
+---+-----+----------+
| 1|false|2020-05-01|
| 1| true|2020-05-06|
| 2| true|2020-05-04|
| 2|false|2020-05-07|
| 2| true|2020-05-09|
| 3|false|2020-05-11|
+---+-----+----------+
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("id").orderBy(F.to_date("eventdate","yyyy-MM-dd"))
df.withColumn("lead", F.lead("eventdate").over(w))
.withColumn("sequence", F.when(F.col("lead").isNotNull(),
F.expr("""sequence(to_date(eventdate),date_sub(to_date(lead),1), interval 1 day)"""))
.otherwise(F.array("eventdate")))
.select("id","valid",F.explode("sequence").alias("eventdate"))
.show(truncate=False)
+---+-----+----------+
|id |valid|eventdate |
+---+-----+----------+
|1 |false|2020-05-01|
|1 |false|2020-05-02|
|1 |false|2020-05-03|
|1 |false|2020-05-04|
|1 |false|2020-05-05|
|1 |true |2020-05-06|
|3 |false|2020-05-11|
|2 |true |2020-05-04|
|2 |true |2020-05-05|
|2 |true |2020-05-06|
|2 |false|2020-05-07|
|2 |false|2020-05-08|
|2 |true |2020-05-09|
+---+-----+----------+