pyspark对任何id的任何滑动窗口进行计数

guykilcj  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(481)

我有一个随时间推移的客户数字访问数据框架,格式如下:

  1. |cust_id|datetime|
  2. |1|2020-08-15 15:20|
  3. |1|2020-08-15 16:20|
  4. |1|2020-08-17 12:20|
  5. |1|2020-08-19 14:20|
  6. |1|2020-08-23 09:20|
  7. |2|2020-08-24 08:00|

我想挑出一些强烈的信号,比如:5天内至少拜访3次的顾客。
我最初的想法是,我们必须为每个客户计算所有滑动窗口。
在本例中,我们以cust1为例:
从2020年8月15日开始,到2020年8月19日结束,为期5天,总访问量为4次
从2020年8月16日开始,到2020年8月20日结束,为期5天,总访问量为2
从2020-08-17开始,到2020-08-21结束的5天窗口期,总访问量为2
等。
所有滑动窗口的最大计数为4。因此cust1符合“5天内至少访问3次”的标准
这似乎是一项代价高昂的行动。
你将如何有效地实现这一点?欢迎有其他想法。

6qfn3psc

6qfn3psc1#

您可以转换 datetime 列到 long 并在rangebetween()函数中传入相当于5天的秒数。

  1. from pyspark.sql.functions import *
  2. from pyspark.sql import functions as F
  3. from pyspark.sql.window import Window
  4. df = df.withColumn("date_long", to_date(substring(col("datetime"),0,10), "yyyy-MM-dd"))\
  5. .withColumn("date_long", unix_timestamp('date_long', 'yyyy-MM-dd'))
  6. days = lambda i: i * 86400
  7. w = (Window.partitionBy('cust_id').orderBy("date_long").rangeBetween(0,days(5)))
  8. df.withColumn('5_day_visit', F.count("*").over(w)).drop('date_long').show()
  9. +-------+----------------+-----------+
  10. |cust_id| datetime|5_day_visit|
  11. +-------+----------------+-----------+
  12. | 1|2020-08-15 15:20| 4|
  13. | 1|2020-08-15 16:20| 4|
  14. | 1|2020-08-17 12:20| 2|
  15. | 1|2020-08-19 14:20| 2|
  16. | 1|2020-08-23 09:20| 1|
  17. | 2|2020-08-24 08:00| 1|
  18. +-------+----------------+-----------+

要获得每位客户最多5天的访问次数,您可以执行以下操作:

  1. df.withColumn('5_day_visit', F.count("*").over(w)).drop('date_long')\
  2. .groupBy('cust_id').agg(F.max('5_day_visit').alias('max_5_day_visits')).show()
  3. +-------+----------------+
  4. |cust_id|max_5_day_visits|
  5. +-------+----------------+
  6. | 1| 4|
  7. | 2| 1|
  8. +-------+----------------+
展开查看全部

相关问题