我有一个带有应用程序日志的数据集,可以显示某个应用程序何时启动或关闭。有时,日志中可能完全缺少相关事件。我想匹配每个应用程序开始与相关的结束事件(如果它存在)。
下面是一个示例数据集:
import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([['Group1', 'Logon', 'Name1', '2021-02-05T19:03:00.000+0000'],
['Group1', 'Start', 'Name1', '2021-02-05T19:04:00.000+0000'],
['Group1', 'Start', 'Name1', '2021-02-05T19:05:00.000+0000'],
['Group1', 'End', 'Name1', '2021-02-05T19:06:00.000+0000'],
['Group1', 'End', 'Name3', '2021-02-05T19:06:01.000+0000'],
['Group1', 'End', 'Name1', '2021-02-05T19:07:00.000+0000'],
['Group2', 'Start', 'Name1', '2021-02-05T19:04:00.000+0000'],
['Group2', 'Start', 'Name1', '2021-02-05T19:05:00.000+0000'],
['Group2', 'Start', 'Name2', '2021-02-05T19:06:00.000+0000'],
['Group2', 'End', 'Name1', '2021-02-05T19:07:00.000+0000'],
['Group2', 'Close', 'Name1', '2021-02-05T19:07:00.000+0000'],
], ['group', 'type', 'name', 'time'])
df = df.withColumn('time', F.col('time').cast('timestamp'))
对于每个单独的组,如果“start”和“end”事件具有相同的“name”,我想为它们设置一个公共标识符。换言之,对于每个“开始”事件,我希望找到尚未与另一个“开始”事件匹配的第一个“结束”事件。
预期结果可能类似于下图:
我不介意标识符(即“myu group”)是一个id、一个时间戳还是在组之间单调递增。我只想能够匹配每个小组的相关事件。
我试过的
我考虑使用窗口函数来识别“开始”事件的结束时间和“结束”事件的开始时间。但是,我不能局限于只搜索“结束”事件(分别搜索“开始”事件)。此外,我无法应用上面描述的逻辑来查找第一个“结束”事件,而该事件尚未与另一个“开始”事件匹配。
这是我的密码:
app_session_window_down = Window.partitionBy('group', "name").orderBy(F.col("time").cast('long')).rangeBetween(1, Window.unboundedFollowing) #search in the future
app_session_window_up = Window.partitionBy('group', "name").orderBy(F.col("time").cast('long')).rangeBetween(Window.unboundedPreceding, -1) #search in the past
df = df.withColumn("app_time_end", F.when((F.col("type") == 'Start'), F.first(F.col('time'), ignorenulls=True).over(app_session_window_down)).otherwise(F.lit('None')))\
.withColumn("app_time_start", F.when((F.col("type") == 'End'), F.last(F.col('time'), ignorenulls=True).over(app_session_window_up)).otherwise(F.col('app_time_end')))
它给出:
这离我想达到的目标还差得远。有什么提示吗?
1条答案
按热度按时间8dtrkrch1#
解释见内联注解: