scala—按时间间隔计算连续用户会话数

vpfxa7rd  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(395)

我的数据由带有开始和结束时间戳的会话组成。我的任务是按公司和应用程序版本统计每个时间间隔“活动”的会话数。我以30分钟的间隔开始。因此,如果一家公司的会议时间是下午2:10到3:35。。。这家公司会在4个箱子/间隔(2:00,2:30,3:00,3:30)中的每一个中计算。如何在spark/scala中解决这个问题?
最终,我需要它来扩展每天数百万次的会话。
这是我的样本数据:

val df = sc.parallelize(List( ("Company B","xi2", "2020-07-02T01:07:00.000+0000", "2020-07-02T02:29:00.000+0000"), ("Company A","xi1", "2020-07-01T23:55:00.000+0000", "2020-07-02T01:17:00.000+0000"), ("Company B","xi2", "2020-07-01T22:31:00.000+0000", "2020-07-01T23:53:00.000+0000"), ("Company B","xi1", "2020-07-01T23:07:00.000+0000", "2020-07-02T00:29:00.000+0000"), ("Company A","xi1", "2020-07-01T22:19:00.000+0000", "2020-07-01T23:41:00.000+0000"), ("Company B","xi1", "2020-07-02T00:07:00.000+0000", "2020-07-02T01:29:00.000+0000"), ("Company B","xi1", "2020-07-02T00:55:00.000+0000", "2020-07-02T02:17:00.000+0000"), ("Company A","xi1", "2020-07-02T00:19:00.000+0000", "2020-07-02T01:41:00.000+0000"), ("Company A","xi2", "2020-07-01T22:55:00.000+0000", "2020-07-02T00:17:00.000+0000"), ("Company B","xi2", "2020-07-02T00:43:00.000+0000", "2020-07-02T02:05:00.000+0000"), ("Company A","xi2", "2020-07-01T23:31:00.000+0000", "2020-07-02T00:53:00.000+0000"), ("Company B","xi1", "2020-07-01T23:19:00.000+0000", "2020-07-02T00:41:00.000+0000"), ("Company A","xi2", "2020-07-01T23:43:00.000+0000", "2020-07-02T01:05:00.000+0000"), ("Company A","xi2", "2020-07-02T00:31:00.000+0000", "2020-07-02T01:53:00.000+0000"), ("Company A","xi2", "2020-07-01T22:43:00.000+0000", "2020-07-02T00:05:00.000+0000")  )).toDF("customer","device_model","start_timestamp","end_timestamp")
.withColumn("start_timestamp", to_timestamp($"start_timestamp"))
.withColumn("end_timestamp", to_timestamp($"end_timestamp"))
display(df)

我希望我的结果如下。这些计数是在30分钟的间隔,但最终我将计数低至一两分钟的间隔。

timeinterval           customer  xi1 xi2
2020-07-01 22:30:00  Company A   1   1
2020-07-01 22:30:00  Company B   0   1
2020-07-01 23:00:00  Company A   1   2
2020-07-01 23:00:00  Company B   1   1
2020-07-01 23:30:00  Company A   1   4
2020-07-01 23:30:00  Company B   2   1
2020-07-02 00:00:00  Company A   1   4
2020-07-02 00:00:00  Company B   3   1
2020-07-02 00:30:00  Company A   2   4
2020-07-02 00:30:00  Company B   3   1
2020-07-02 01:00:00  Company A   2   3
2020-07-02 01:00:00  Company B   2   2
2020-07-02 01:30:00  Company A   2   1
2020-07-02 01:30:00  Company B   2   2
2020-07-02 02:00:00  Company A   0   1
2020-07-02 02:00:00  Company B   1   2
2020-07-02 02:30:00  Company B   1   1

任何关于最佳方法的帮助或想法都将不胜感激。

dwbf0jvd

dwbf0jvd1#

也许这是有帮助的-

加载提供的测试数据

val df = spark.sparkContext.parallelize(List(
      ("Company B","xi2", "2020-07-02T01:07:00.000+0000", "2020-07-02T02:29:00.000+0000"),
      ("Company A","xi1", "2020-07-01T23:55:00.000+0000", "2020-07-02T01:17:00.000+0000"),
      ("Company B","xi2", "2020-07-01T22:31:00.000+0000", "2020-07-01T23:53:00.000+0000"),
      ("Company B","xi1", "2020-07-01T23:07:00.000+0000", "2020-07-02T00:29:00.000+0000"),
      ("Company A","xi1", "2020-07-01T22:19:00.000+0000", "2020-07-01T23:41:00.000+0000"),
      ("Company B","xi1", "2020-07-02T00:07:00.000+0000", "2020-07-02T01:29:00.000+0000"),
      ("Company B","xi1", "2020-07-02T00:55:00.000+0000", "2020-07-02T02:17:00.000+0000"),
      ("Company A","xi1", "2020-07-02T00:19:00.000+0000", "2020-07-02T01:41:00.000+0000"),
      ("Company A","xi2", "2020-07-01T22:55:00.000+0000", "2020-07-02T00:17:00.000+0000"),
      ("Company B","xi2", "2020-07-02T00:43:00.000+0000", "2020-07-02T02:05:00.000+0000"),
      ("Company A","xi2", "2020-07-01T23:31:00.000+0000", "2020-07-02T00:53:00.000+0000"),
      ("Company B","xi1", "2020-07-01T23:19:00.000+0000", "2020-07-02T00:41:00.000+0000"),
      ("Company A","xi2", "2020-07-01T23:43:00.000+0000", "2020-07-02T01:05:00.000+0000"),
      ("Company A","xi2", "2020-07-02T00:31:00.000+0000", "2020-07-02T01:53:00.000+0000"),
      ("Company A","xi2", "2020-07-01T22:43:00.000+0000", "2020-07-02T00:05:00.000+0000")  ))
      .toDF("customer","device_model","start_timestamp","end_timestamp")
      .withColumn("start_timestamp", to_timestamp($"start_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
      .withColumn("end_timestamp", to_timestamp($"end_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
    df.show(false)
    /**
      * +---------+------------+-------------------+-------------------+
      * |customer |device_model|start_timestamp    |end_timestamp      |
      * +---------+------------+-------------------+-------------------+
      * |Company B|xi2         |2020-07-02 06:37:00|2020-07-02 07:59:00|
      * |Company A|xi1         |2020-07-02 05:25:00|2020-07-02 06:47:00|
      * |Company B|xi2         |2020-07-02 04:01:00|2020-07-02 05:23:00|
      * |Company B|xi1         |2020-07-02 04:37:00|2020-07-02 05:59:00|
      * |Company A|xi1         |2020-07-02 03:49:00|2020-07-02 05:11:00|
      * |Company B|xi1         |2020-07-02 05:37:00|2020-07-02 06:59:00|
      * |Company B|xi1         |2020-07-02 06:25:00|2020-07-02 07:47:00|
      * |Company A|xi1         |2020-07-02 05:49:00|2020-07-02 07:11:00|
      * |Company A|xi2         |2020-07-02 04:25:00|2020-07-02 05:47:00|
      * |Company B|xi2         |2020-07-02 06:13:00|2020-07-02 07:35:00|
      * |Company A|xi2         |2020-07-02 05:01:00|2020-07-02 06:23:00|
      * |Company B|xi1         |2020-07-02 04:49:00|2020-07-02 06:11:00|
      * |Company A|xi2         |2020-07-02 05:13:00|2020-07-02 06:35:00|
      * |Company A|xi2         |2020-07-02 06:01:00|2020-07-02 07:23:00|
      * |Company A|xi2         |2020-07-02 04:13:00|2020-07-02 05:35:00|
      * +---------+------------+-------------------+-------------------+
      */

序列+间隔生成箱子/间隔

根据需要更改间隔分钟

val intervalInMinutes = 30
    val seconds = intervalInMinutes * 60 // seconds
    val p = df.withColumn("new_start", to_timestamp(floor($"start_timestamp".cast("long")/ seconds ) * seconds))
      .withColumn("splits", sequence(
      $"new_start",
      $"end_timestamp",
      expr(s"interval $intervalInMinutes MINUTE")))
    p.show(false)

    /**
      * +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
      * |customer |device_model|start_timestamp    |end_timestamp      |new_start          |splits                                                                              |
      * +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
      * |Company B|xi2         |2020-07-02 06:37:00|2020-07-02 07:59:00|2020-07-02 06:30:00|[2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00]                     |
      * |Company A|xi1         |2020-07-02 05:25:00|2020-07-02 06:47:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00]|
      * |Company B|xi2         |2020-07-02 04:01:00|2020-07-02 05:23:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00]                     |
      * |Company B|xi1         |2020-07-02 04:37:00|2020-07-02 05:59:00|2020-07-02 04:30:00|[2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00]                     |
      * |Company A|xi1         |2020-07-02 03:49:00|2020-07-02 05:11:00|2020-07-02 03:30:00|[2020-07-02 03:30:00, 2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00]|
      * |Company B|xi1         |2020-07-02 05:37:00|2020-07-02 06:59:00|2020-07-02 05:30:00|[2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00]                     |
      * |Company B|xi1         |2020-07-02 06:25:00|2020-07-02 07:47:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00]|
      * |Company A|xi1         |2020-07-02 05:49:00|2020-07-02 07:11:00|2020-07-02 05:30:00|[2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00]|
      * |Company A|xi2         |2020-07-02 04:25:00|2020-07-02 05:47:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00]|
      * |Company B|xi2         |2020-07-02 06:13:00|2020-07-02 07:35:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00, 2020-07-02 07:30:00]|
      * |Company A|xi2         |2020-07-02 05:01:00|2020-07-02 06:23:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00]                     |
      * |Company B|xi1         |2020-07-02 04:49:00|2020-07-02 06:11:00|2020-07-02 04:30:00|[2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00]|
      * |Company A|xi2         |2020-07-02 05:13:00|2020-07-02 06:35:00|2020-07-02 05:00:00|[2020-07-02 05:00:00, 2020-07-02 05:30:00, 2020-07-02 06:00:00, 2020-07-02 06:30:00]|
      * |Company A|xi2         |2020-07-02 06:01:00|2020-07-02 07:23:00|2020-07-02 06:00:00|[2020-07-02 06:00:00, 2020-07-02 06:30:00, 2020-07-02 07:00:00]                     |
      * |Company A|xi2         |2020-07-02 04:13:00|2020-07-02 05:35:00|2020-07-02 04:00:00|[2020-07-02 04:00:00, 2020-07-02 04:30:00, 2020-07-02 05:00:00, 2020-07-02 05:30:00]|
      * +---------+------------+-------------------+-------------------+-------------------+------------------------------------------------------------------------------------+
      */

pivot+count以获取每个间隔的计数

p.select($"customer", $"device_model", explode($"splits").as("timeinterval"))
      .groupBy("timeinterval", "customer")
      .pivot("device_model")
      .agg(
        count("device_model")
      )
      .withColumn("xi1", coalesce($"xi1", lit(0)))
      .withColumn("xi2", coalesce($"xi2", lit(0)))
      .orderBy("timeinterval", "customer")
      .show(false)

    /**
      * +-------------------+---------+---+---+
      * |timeinterval       |customer |xi1|xi2|
      * +-------------------+---------+---+---+
      * |2020-07-02 03:30:00|Company A|1  |0  |
      * |2020-07-02 04:00:00|Company A|1  |2  |
      * |2020-07-02 04:00:00|Company B|0  |1  |
      * |2020-07-02 04:30:00|Company A|1  |2  |
      * |2020-07-02 04:30:00|Company B|2  |1  |
      * |2020-07-02 05:00:00|Company A|2  |4  |
      * |2020-07-02 05:00:00|Company B|2  |1  |
      * |2020-07-02 05:30:00|Company A|2  |4  |
      * |2020-07-02 05:30:00|Company B|3  |0  |
      * |2020-07-02 06:00:00|Company A|2  |3  |
      * |2020-07-02 06:00:00|Company B|3  |1  |
      * |2020-07-02 06:30:00|Company A|2  |2  |
      * |2020-07-02 06:30:00|Company B|2  |2  |
      * |2020-07-02 07:00:00|Company A|1  |1  |
      * |2020-07-02 07:00:00|Company B|1  |2  |
      * |2020-07-02 07:30:00|Company B|1  |2  |
      * +-------------------+---------+---+---+
      */

相关问题