我正在使用spark中的timeseries数据,希望通过应用会话筛选器来减少timestep的数量,该筛选器将timestep彼此分组,并且只保留该组中的最后一个timestep。需要确保的是,没有时间步延迟超过特定时间。
我怎样才能用Pypark优雅地做到这一点?
例子:
最大延迟为59分钟
输入时间步长:12:00、12:01、12:03、13:00、13:15、13:45、13:58、14:15、14:30、14:45、15:00、15:20、15:30;19:00
我想获得:12:03,13:58,15:00,15:30,19:00
到目前为止,我只找到了一个(缓慢而冗长的)迭代解决方案:
"""
all_canges : pyspark.sql.DataFrame
Input dataframe containing only partition columns and timestamp column
maxSessionDuration : int
Maximum duration of a session in seconds
key : list
List of partition keys
order_column : string
Name of the timestamp column
max_iterations: int
Maximum number of iterations to resolve a series of changes longer than the session duration.
"""
time_window = Window.partitionBy(key).orderBy("timestamp_seconds")
# Column names
timestep_seconds_col = "timestamp_seconds"
largest_preceding_col = "largest_preceding"
session_timestamp_col = "session_timestamp"
preserve_timestamp_col = "preserve_timestamp"
# Timestamp in seconds
all_changes = all_changes.withColumn(timestep_seconds_col, F.col(order_column).cast('timestamp').cast('long'))
# Split changes in session timesteps and non-session timesteps, for first run, all timestamps are non-session timesteps
session_timesteps = all_changes.filter(F.lit(False))
non_session_timesteps = all_changes.filter(F.lit(True))
# Logic for keeping records to in case of along series of changes that can be longer than a session.
cond_preceding = (F.col(largest_preceding_col).isNull() |
(F.col(largest_preceding_col) < F.col(timestep_seconds_col)))
# Initialize
iterations = 0
converged = False
while (iterations < max_iterations) & ~converged:
iterations += 1
new_timesteps = (
non_session_timesteps
# Step 1:
# Max timestamp (value of order column) within max session duration
.withColumn(session_timestamp_col,
F.max(F.col(timestep_seconds_col)).over(time_window.rangeBetween(0, maxSessionDuration)))
# Step 2
# Only keep session_timesteps if the timestamp is bigger than the session_timestamp of the previous row.
.withColumn(session_timestamp_col,
F.when(F.col(timestep_seconds_col) > F.lag(session_timestamp_col, 1, 0).over(time_window),
F.col(session_timestamp_col))
.otherwise(F.lit(None))
)
# Step 3
# Account for long series of changes that can be longer than a session.
.withColumn(largest_preceding_col,
F.last(F.col(session_timestamp_col),
ignorenulls=True)
.over(time_window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
.withColumn(preserve_timestamp_col, cond_preceding)
)
# Session timestamps
new_session_timestamps = (
new_timesteps
.filter(F.col(session_timestamp_col).isNotNull())
.withColumn(timestep_seconds_col, F.col(session_timestamp_col))
.drop(session_timestamp_col, largest_preceding_col, preserve_timestamp_col)
)
session_timesteps = session_timesteps.unionByName(new_session_timestamps)
# Non-session timesteps to be treated in next iteration
non_session_timesteps = (
new_timesteps
.filter(F.col(preserve_timestamp_col))
.drop(session_timestamp_col, largest_preceding_col, preserve_timestamp_col)
)
# Convergence means that no time steps needed to be recovered due to consecutive changes
converged = non_session_timesteps.count() == 0
# Use the remaining session_timestamps
all_changes_reduced = (
session_timesteps
.unionByName(non_session_timesteps) # in case max iterations is reached
.select(*key, F.col(timestep_seconds_col).cast(T.TimestampType()).alias(order_column))
)
暂无答案!
目前还没有任何答案,快来回答吧!