pyspark对任何id的任何滑动窗口进行计数

guykilcj  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(450)

我有一个随时间推移的客户数字访问数据框架,格式如下:

|cust_id|datetime|
|1|2020-08-15 15:20|
|1|2020-08-15 16:20|
|1|2020-08-17 12:20|
|1|2020-08-19 14:20|
|1|2020-08-23 09:20|
|2|2020-08-24 08:00|

我想挑出一些强烈的信号,比如:5天内至少拜访3次的顾客。
我最初的想法是,我们必须为每个客户计算所有滑动窗口。
在本例中,我们以cust1为例:
从2020年8月15日开始,到2020年8月19日结束,为期5天,总访问量为4次
从2020年8月16日开始,到2020年8月20日结束,为期5天,总访问量为2
从2020-08-17开始,到2020-08-21结束的5天窗口期,总访问量为2
等。
所有滑动窗口的最大计数为4。因此cust1符合“5天内至少访问3次”的标准
这似乎是一项代价高昂的行动。
你将如何有效地实现这一点?欢迎有其他想法。

6qfn3psc

6qfn3psc1#

您可以转换 datetime 列到 long 并在rangebetween()函数中传入相当于5天的秒数。

from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window

df = df.withColumn("date_long", to_date(substring(col("datetime"),0,10), "yyyy-MM-dd"))\
        .withColumn("date_long", unix_timestamp('date_long', 'yyyy-MM-dd'))

days = lambda i: i * 86400 
w = (Window.partitionBy('cust_id').orderBy("date_long").rangeBetween(0,days(5)))

df.withColumn('5_day_visit', F.count("*").over(w)).drop('date_long').show()
+-------+----------------+-----------+                                          
|cust_id|        datetime|5_day_visit|
+-------+----------------+-----------+
|      1|2020-08-15 15:20|          4|
|      1|2020-08-15 16:20|          4|
|      1|2020-08-17 12:20|          2|
|      1|2020-08-19 14:20|          2|
|      1|2020-08-23 09:20|          1|
|      2|2020-08-24 08:00|          1|
+-------+----------------+-----------+

要获得每位客户最多5天的访问次数,您可以执行以下操作:

df.withColumn('5_day_visit', F.count("*").over(w)).drop('date_long')\
    .groupBy('cust_id').agg(F.max('5_day_visit').alias('max_5_day_visits')).show()
+-------+----------------+                                                      
|cust_id|max_5_day_visits|
+-------+----------------+
|      1|               4|
|      2|               1|
+-------+----------------+

相关问题