如何在pyspark中进行会话过滤?

zbdgwd5y  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(208)

我正在使用spark中的timeseries数据,希望通过应用会话筛选器来减少timestep的数量,该筛选器将timestep彼此分组,并且只保留该组中的最后一个timestep。需要确保的是,没有时间步延迟超过特定时间。
我怎样才能用Pypark优雅地做到这一点?
例子:
最大延迟为59分钟
输入时间步长:12:00、12:01、12:03、13:00、13:15、13:45、13:58、14:15、14:30、14:45、15:00、15:20、15:30;19:00
我想获得:12:03,13:58,15:00,15:30,19:00
到目前为止,我只找到了一个(缓慢而冗长的)迭代解决方案:

"""
all_canges : pyspark.sql.DataFrame
        Input dataframe containing only partition columns and timestamp column
maxSessionDuration : int
        Maximum duration of a session in seconds
key : list
        List of partition keys
order_column : string
        Name of the timestamp column
max_iterations: int
        Maximum number of iterations to resolve a series of changes longer than the session duration. 
"""       

time_window = Window.partitionBy(key).orderBy("timestamp_seconds")

# Column names

timestep_seconds_col = "timestamp_seconds"
largest_preceding_col = "largest_preceding"
session_timestamp_col = "session_timestamp"
preserve_timestamp_col = "preserve_timestamp"

# Timestamp in seconds

all_changes = all_changes.withColumn(timestep_seconds_col, F.col(order_column).cast('timestamp').cast('long'))

# Split changes in session timesteps and non-session timesteps, for first run, all timestamps are non-session timesteps

session_timesteps = all_changes.filter(F.lit(False))
non_session_timesteps = all_changes.filter(F.lit(True))

# Logic for keeping records to in case of along series of changes that can be longer than a session.

cond_preceding = (F.col(largest_preceding_col).isNull() |
                  (F.col(largest_preceding_col) < F.col(timestep_seconds_col)))

# Initialize

iterations = 0
converged = False

while (iterations < max_iterations) & ~converged:
    iterations += 1

    new_timesteps = (
        non_session_timesteps
        # Step 1:
        # Max timestamp (value of order column) within max session duration
        .withColumn(session_timestamp_col,
                    F.max(F.col(timestep_seconds_col)).over(time_window.rangeBetween(0, maxSessionDuration)))
        # Step 2
        # Only keep session_timesteps if the timestamp is bigger than the session_timestamp of the previous row.
        .withColumn(session_timestamp_col,
                    F.when(F.col(timestep_seconds_col) > F.lag(session_timestamp_col, 1, 0).over(time_window),
                           F.col(session_timestamp_col))
                    .otherwise(F.lit(None))
                    )
        # Step 3
        # Account for long series of changes that can be longer than a session.
        .withColumn(largest_preceding_col,
                    F.last(F.col(session_timestamp_col),
                           ignorenulls=True)
                    .over(time_window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
        .withColumn(preserve_timestamp_col, cond_preceding)
    )
    # Session timestamps
    new_session_timestamps = (
        new_timesteps
        .filter(F.col(session_timestamp_col).isNotNull())
        .withColumn(timestep_seconds_col, F.col(session_timestamp_col))
        .drop(session_timestamp_col, largest_preceding_col, preserve_timestamp_col)
    )
    session_timesteps = session_timesteps.unionByName(new_session_timestamps)

    # Non-session timesteps to be treated in next iteration
    non_session_timesteps = (
        new_timesteps
        .filter(F.col(preserve_timestamp_col))
        .drop(session_timestamp_col, largest_preceding_col, preserve_timestamp_col)
    )
    # Convergence means that no time steps needed to be recovered due to consecutive changes
    converged = non_session_timesteps.count() == 0

# Use the remaining session_timestamps

all_changes_reduced = (
    session_timesteps
    .unionByName(non_session_timesteps)  # in case max iterations is reached
    .select(*key, F.col(timestep_seconds_col).cast(T.TimestampType()).alias(order_column))
)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题