python 如何使用pyspark绑定时间段?

zd287kbt  于 2023-04-19  发布在  Python
关注(0)|答案(2)|浏览(123)

我有以下(过于简化的)数据集:
| 用户标识|程序标识|程序启动|程序结束|查看开始|观察端|
| --------------|--------------|--------------|--------------|--------------|--------------|
| 0|1|2019 -01- 21 00:00:00|2023-01-01 01:00:00|2019 -01- 15 00:00:00|2019 -01- 22 00:00:00|
| 1|1|2019 -01- 21 00:00:00|2023-01-01 01:00:00|2019 -01- 22 00:00:00|2019 -01- 15 00:00:00|
我喜欢的是
| 用户标识|程序标识|程序启动|程序结束|查看开始|观察端|起动箱|端仓|
| --------------|--------------|--------------|--------------|--------------|--------------|--------------|--------------|
| 0|1|2019 -01- 21 00:00:00|2023-01-01 01:00:00|2019 -01- 15 00:00:00|2019 -01- 22 00:00:00|1|四|
| 1|1|2019 -01- 21 00:00:00|2023-01-01 01:00:00|2019 -01- 22 00:00:00|2019 -01- 15 00:00:00|二|二|
我让我的用户(userid)观看特定的资产(program_id),每个资产都有一个定义的持续时间(program_start到program_end时间)。用户可以随时开始和结束观看,只要它在program_start和program_end之间
我想做的是将我的资产分仓/分桶,比如说分成4个桶。这仍然很容易实现,但接下来我想了解用户是在这些容器中的哪个容器中开始查看资产的。
因此,如果资产从午夜运行到凌晨1点,并且用户在午夜10点开始观看,则他适合第一桶(或基本上在第一季度开始观看)。
我正在努力用pyspark来实现这一点,所以任何关于如何在不消耗我所有资源的情况下解决这个问题的建议都是有帮助的。
不是真的寻找全面的复制/粘贴解决方案,只是一些友好的建议和正确方向的指导可能足以让我继续前进。

rks48beu

rks48beu1#

完整示例:

from pyspark.sql import types as T, functions as F
df = spark.read.json(spark.sparkContext.parallelize([{"userid":0,"program_id":1,"program_start":"2023-01-01 00:00:00","program_end":"2023-01-01 01:00:00","viewing_start":"2023-01-01 00:05:00","viewing_end":"2023-01-01 00:50:00"},{"userid":1,"program_id":1,"program_start":"2023-01-01 00:00:00","program_end":"2023-01-01 01:00:00","viewing_start":"2023-01-01 00:20:00","viewing_end":"2023-01-01 00:25:00"}]), schema=T.StructType([
    T.StructField('user_id', T.IntegerType()),
    T.StructField('program_id', T.IntegerType()),
    T.StructField('program_start', T.TimestampType()),
    T.StructField('program_end', T.TimestampType()),
    T.StructField('viewing_start', T.TimestampType()),
    T.StructField('viewing_end', T.TimestampType())
]))

df.withColumn(
    'bucket_duration', 
    ((F.col('program_end') - F.col('program_start')) / 60).cast('int') / 4
).withColumn(
    'start_offset', 
    ((F.col('viewing_start') - F.col('program_start')) / 60).cast('int')
).withColumn(
    'end_offset', 
    ((F.col('viewing_end') - F.col('program_start')) / 60).cast('int')
).withColumn(
    'start_bucket',
    F.ceil(F.col('start_offset') / F.col('bucket_duration'))
).withColumn(
    'end_bucket',
    F.ceil(F.col('end_offset') / F.col('bucket_duration'))
).display()
用户ID程序标识程序启动程序结束查看开始观察端铲斗持续时间起始偏移端部偏移起动铲斗端斗
0012019 -01- 21 00:00:002023-01-01 01:00:002019 -01- 15 00:00:002019 -01- 22 00:00:00十五岁五十1
1112019 -01- 21 00:00:002023-01-01 01:00:002019 -01- 22 00:00:002019 -01- 15 00:00:00十五岁二十二十五

这个过程非常简单,它的工作原理假设观看结束不晚于节目结束。
我们计算节目持续时间,然后查看自节目开始以来用户开始/结束观看的时间,将其除以时段持续时间(总持续时间/ 4个时段),然后得到开始和结束时段。
它可以用更少的“withColumns”来完成,但我添加它们是为了清楚地说明会发生什么。
它需要调整一点,以支持在确切的时间开始或非常短的程序,但你要求一个大致的想法:)

dfty9e19

dfty9e192#

你也可以使用array来实现这个--得到bin的长度,并在一个struct数组中创建bin。
这里有一个例子

data_sdf. \
    withColumn('window_secs', (func.col('prog_end') - func.col('prog_start')).cast('long') / 4). \
    withColumn('ts_arr', func.array_repeat(func.col('prog_start').cast('long'), 4)). \
    withColumn('ts_arr', 
               func.expr('''transform(ts_arr, 
                                      (x, i) -> struct(i+1 as win, 
                                                       cast(x + i*window_secs as timestamp) as win_start, 
                                                       cast(x + (i+1)*window_secs as timestamp) as win_end
                                                       )
                                      )
                         ''')
               ). \
    withColumn('view_start_win', func.filter('ts_arr', lambda x: func.col('view_start').between(x.win_start, x.win_end))[0]). \
    withColumn('view_start_win', func.col('view_start_win.win')). \
    withColumn('view_end_win', func.filter('ts_arr', lambda x: func.col('view_end').between(x.win_start, x.win_end))[0]). \
    withColumn('view_end_win', func.col('view_end_win.win')). \
    show(truncate=False)

# +-----+------+-------------------+-------------------+-------------------+-------------------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+------------+
# |usrid|progid|prog_start         |prog_end           |view_start         |view_end           |window_secs|ts_arr                                                                                                                                                                                      |view_start_win|view_end_win|
# +-----+------+-------------------+-------------------+-------------------+-------------------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+------------+
# |0    |1     |2023-01-01 00:00:00|2023-01-01 01:00:00|2023-01-01 00:05:00|2023-01-01 00:50:00|900.0      |[{1, 2023-01-01 00:00:00, 2023-01-01 00:15:00}, {2, 2023-01-01 00:15:00, 2023-01-01 00:30:00}, {3, 2023-01-01 00:30:00, 2023-01-01 00:45:00}, {4, 2023-01-01 00:45:00, 2023-01-01 01:00:00}]|1             |4           |
# |1    |1     |2023-01-01 00:00:00|2023-01-01 01:00:00|2023-01-01 00:20:00|2023-01-01 00:25:00|900.0      |[{1, 2023-01-01 00:00:00, 2023-01-01 00:15:00}, {2, 2023-01-01 00:15:00, 2023-01-01 00:30:00}, {3, 2023-01-01 00:30:00, 2023-01-01 00:45:00}, {4, 2023-01-01 00:45:00, 2023-01-01 01:00:00}]|2             |2           |
# +-----+------+-------------------+-------------------+-------------------+-------------------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+------------+

相关问题