问题:假设时间序列数据是用户活动的点击流,存储在配置单元中,Ask是使用Spark用会话ID来丰富数据。
会话定义
- 会话在1小时不活动后到期
- 会话保持活动状态,总持续时间为2小时
数据:
click_time,user_id
2018-01-01 11:00:00,u1
2018-01-01 12:10:00,u1
2018-01-01 13:00:00,u1
2018-01-01 13:50:00,u1
2018-01-01 14:40:00,u1
2018-01-01 15:30:00,u1
2018-01-01 16:20:00,u1
2018-01-01 16:50:00,u1
2018-01-01 11:00:00,u2
2018-01-02 11:00:00,u2
以下是仅考虑会话定义中的第一点的部分解决方案:
val win1 = Window.partitionBy("user_id").orderBy("click_time")
val sessionnew = when((unix_timestamp($"click_time") - unix_timestamp(lag($"click_time",1,"2017-01-01 11:00:00.0").over(win1)))/60 >= 60, 1).otherwise(0)
userActivity
.withColumn("session_num",sum(sessionnew).over(win1))
.withColumn("session_id",concat($"user_id", $"session_num"))
.show(truncate = false)
实际产量:
+---------------------+-------+-----------+----------+
|click_time |user_id|session_num|session_id|
+---------------------+-------+-----------+----------+
|2018-01-01 11:00:00.0|u1 |1 |u11 |
|2018-01-01 12:10:00.0|u1 |2 |u12 | -- session u12 starts
|2018-01-01 13:00:00.0|u1 |2 |u12 |
|2018-01-01 13:50:00.0|u1 |2 |u12 |
|2018-01-01 14:40:00.0|u1 |2 |u12 | -- this should be a new session as diff of session start of u12 and this row exceeds 2 hours
|2018-01-01 15:30:00.0|u1 |2 |u12 |
|2018-01-01 16:20:00.0|u1 |2 |u12 |
|2018-01-01 16:50:00.0|u1 |2 |u12 | -- now this has to be compared with row 5 to find difference
|2018-01-01 11:00:00.0|u2 |1 |u21 |
|2018-01-02 11:00:00.0|u2 |2 |u22 |
+---------------------+-------+-----------+----------+
为了包含第二个条件,我尝试找出当前时间与上次会话开始时间之间的差异,以检查是否超过2小时,但引用本身对以下行进行了更改。这些是一些可以通过运行Sum来实现的用例,但这不适合这里。
5条答案
按热度按时间efzxgjgh1#
这不是一个直接要解决的问题,但这里有一种方法:
1.使用窗口
lag
时间戳差异为rule #1
标识每个用户的会话(其中0
=会话的开始1.对数据集进行分组,以组合每个用户的时间戳差异列表
1.通过UDF处理时间戳差异列表,以标识
rule #2
的会话并为每个用户创建所有会话ID1.通过Spark的
explode
扩展分组数据集示例代码如下:
请注意,UDF中
foldLeft
的累加器是(ls, j, k)
的元组,其中:ls
是要返回的格式化会话ID列表j
和k
分别用于将有条件更改的时间戳值和会话ID号带入下一次迭代步骤
1
:步骤
2
-4
:还要注意,在步骤
2
-4
中,click_time
被“传递”,以便被包括在最终数据集中。iecba09b2#
虽然Leo提供的答案工作得很好,但我觉得使用收集和分解函数来解决问题是一个复杂的方法。这个问题可以使用Spark的方法通过使用UDAF来解决,以便在不久的将来也可以进行修改。请在下面的类似行中查看解决方案
bnl4lu3b3#
完整解决方案
u1ehiz5o4#
简单的方法:
wfypjpf45#
-不使用分解的解决方案-。
---Steps taken to solve ---
1.首先找出点击时间不到1小时的点击流,找出连续的分组。
2.然后找出基于2小时条件的点击流,并使该条件的连续群适用我必须基于如下逻辑创建两个连续群。
3.一个组将基于时间差的总和,并使一个组,即TEMP_SESSION_2hr,并在此基础上找到下一个组FINAL_SESSION_GROUPS。
4.将以上连续组相加并加+1以填充ALGO末尾的FINAL_SESSION列,并根据您的要求进行连接以显示SESSION_ID。‘
result will be looking like this
`