python—有没有一种方法可以遍历pysparkDataframe并标识没有显式会话键的会话?

yyhrrdl8  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(447)

我有一个PyparkDataframe,格式如下:

+-------+----------+---------------------+
| event | consumer |      timestamp      |
+-------+----------+---------------------+
| E     |        1 | 2020-09-09 13:15:00 |
| E     |        1 | 2020-09-09 13:30:00 |
| E     |        1 | 2020-09-09 14:20:00 |
| T     |        1 | 2020-09-09 14:35:00 |
| T     |        2 | 2020-09-09 13:20:00 |
| E     |        2 | 2020-09-09 13:25:00 |
| E     |        2 | 2020-09-09 14:45:00 |
| T     |        2 | 2020-09-09 14:50:00 |
+-------+----------+---------------------+

有没有一种方法可以遍历由 consumer 并由 timestamp 并将值设置为新列?
新列将定义 session_timestamp . 这就是背后的逻辑:
会话仅以事件开始 E .
如果会话开始后一小时内发生新事件,则该事件属于该会话。
如果一个事件发生在启动会话的事件的一个小时以上,则它属于另一个会话(这是Dataframe中第2行和第3行之间发生的情况)。
因此,上述Dataframe的结果是:

+-------+----------+---------------------+---------------------+
| event | consumer |      timestamp      |  session_timestamp  |
+-------+----------+---------------------+---------------------+
| E     |        1 | 2020-09-09 13:15:00 | 2020-09-09 13:15:00 |
| E     |        1 | 2020-09-09 13:30:00 | 2020-09-09 13:15:00 |
| E     |        1 | 2020-09-09 14:20:00 | 2020-09-09 14:20:00 |
| T     |        1 | 2020-09-09 14:35:00 | 2020-09-09 14:20:00 |
| T     |        2 | 2020-09-09 13:20:00 | Null                |
| E     |        2 | 2020-09-09 13:25:00 | 2020-09-09 13:25:00 |
| E     |        2 | 2020-09-09 14:45:00 | 2020-09-09 14:45:00 |
| T     |        2 | 2020-09-09 14:50:00 | 2020-09-09 14:45:00 |
+-------+----------+---------------------+---------------------+

有办法在Pypark上做吗?

iibxawm4

iibxawm41#

正如@ofek在评论中所说, window 功能将帮助您。这里给你一个scala的例子,你可以自己用python重写它(考虑到pyspark中的自定义聚合函数并不容易,这里收集并使用udf处理它)

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df = <your-dataframe>

val findSessionStartTime = udf((rows: Seq[Seq[Any]]) => {
  val parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  var result: Date = null
  for (row <- rows.reverse) {
    val event = row(0)
    val time = parser.parse(row(1).toString)
    if (event == "E") {
      if (result == null || result.getTime - time.getTime < 3600000) {
        result = time
      }
    }
  }
  if (result == null)
    null
  else
    parser.format(result)
})

df.withColumn("events", collect_list(array($"event", $"timestamp")).over(Window
  .partitionBy($"consumer")
  .orderBy($"timestamp")))
  .withColumn("session_timestamp", findSessionStartTime($"events"))
  .drop("events")
  .show(false)

结果如下:
(另外,您的样品说明结果不正确。两者之间的时间 2020-09-09 14:20:00 以及 2020-09-09 13:30:00 50分钟<1小时)

+-----+--------+-------------------+-------------------+
|event|consumer|timestamp          |session_timestamp  |
+-----+--------+-------------------+-------------------+
|E    |1       |2020-09-09 13:15:00|2020-09-09 13:15:00|
|E    |1       |2020-09-09 13:30:00|2020-09-09 13:15:00|
|E    |1       |2020-09-09 14:20:00|2020-09-09 13:15:00|
|T    |1       |2020-09-09 14:35:00|2020-09-09 13:15:00|
|T    |2       |2020-09-09 13:20:00|null               |
|E    |2       |2020-09-09 13:25:00|2020-09-09 13:25:00|
|E    |2       |2020-09-09 14:45:00|2020-09-09 14:45:00|
|T    |2       |2020-09-09 14:50:00|2020-09-09 14:45:00|
+-----+--------+-------------------+-------------------+

相关问题