我有一个PyparkDataframe,格式如下:
+-------+----------+---------------------+
| event | consumer | timestamp |
+-------+----------+---------------------+
| E | 1 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 13:30:00 |
| E | 1 | 2020-09-09 14:20:00 |
| T | 1 | 2020-09-09 14:35:00 |
| T | 2 | 2020-09-09 13:20:00 |
| E | 2 | 2020-09-09 13:25:00 |
| E | 2 | 2020-09-09 14:45:00 |
| T | 2 | 2020-09-09 14:50:00 |
+-------+----------+---------------------+
有没有一种方法可以遍历由 consumer
并由 timestamp
并将值设置为新列?
新列将定义 session_timestamp
. 这就是背后的逻辑:
会话仅以事件开始 E
.
如果会话开始后一小时内发生新事件,则该事件属于该会话。
如果一个事件发生在启动会话的事件的一个小时以上,则它属于另一个会话(这是Dataframe中第2行和第3行之间发生的情况)。
因此,上述Dataframe的结果是:
+-------+----------+---------------------+---------------------+
| event | consumer | timestamp | session_timestamp |
+-------+----------+---------------------+---------------------+
| E | 1 | 2020-09-09 13:15:00 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 13:30:00 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 14:20:00 | 2020-09-09 14:20:00 |
| T | 1 | 2020-09-09 14:35:00 | 2020-09-09 14:20:00 |
| T | 2 | 2020-09-09 13:20:00 | Null |
| E | 2 | 2020-09-09 13:25:00 | 2020-09-09 13:25:00 |
| E | 2 | 2020-09-09 14:45:00 | 2020-09-09 14:45:00 |
| T | 2 | 2020-09-09 14:50:00 | 2020-09-09 14:45:00 |
+-------+----------+---------------------+---------------------+
有办法在Pypark上做吗?
1条答案
按热度按时间iibxawm41#
正如@ofek在评论中所说,
window
功能将帮助您。这里给你一个scala的例子,你可以自己用python重写它(考虑到pyspark中的自定义聚合函数并不容易,这里收集并使用udf处理它)结果如下:
(另外,您的样品说明结果不正确。两者之间的时间
2020-09-09 14:20:00
以及2020-09-09 13:30:00
50分钟<1小时)