在PySpark中计算组内持续时间

5w9g7ksd  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(238)

我想计算相同date_idsubs_noyearmonthday的组内的持续时间。如果是第一个条目,它应该只显示“第一”。
这是我的数据集:

+--------+---------------+--------+----+-----+---+
| date_id|             ts| subs_no|year|month|day|
+--------+---------------+--------+----+-----+---+
|20200801|14:27:18.000000|10007239|2022|    6|  1|
|20200801|14:29:44.000000|10054647|2022|    6|  1|
|20200801|08:24:21.000000|10057750|2022|    6|  1|
|20200801|13:49:27.000000|10019958|2022|    6|  1|
|20200801|20:07:32.000000|10019958|2022|    6|  1|
+--------+---------------+--------+----+-----+---+

注意:列“ts”是字符串类型。
以下是我的预期输出:

+--------+---------------+--------+----+-----+---+---------+
| date_id|             ts| subs_no|year|month|day| duration|
+--------+---------------+--------+----+-----+---+---------+
|20200801|14:27:18.000000|10007239|2022|    6|  1| first   |
|20200801|14:29:44.000000|10054647|2022|    6|  1| first   |
|20200801|08:24:21.000000|10057750|2022|    6|  1| first   |
|20200801|13:49:27.000000|10019958|2022|    6|  1| first   |
|20200801|20:07:32.000000|10019958|2022|    6|  1| 6:18:05 |
+--------+---------------+--------+----+-----+---+---------+
j2datikz

j2datikz1#

您可以尝试将一些列连接成一个表示实际时间戳的列。然后,使用min作为窗口函数进行计算。最后,将duration“00:00:00”替换为“first”。
输入:

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
    [('20200801', '14:27:18.000000', '10007239', 2022, 6, 1),
     ('20200801', '14:29:44.000000', '10054647', 2022, 6, 1),
     ('20200801', '08:24:21.000000', '10057750', 2022, 6, 1),
     ('20200801', '13:49:27.000000', '10019958', 2022, 6, 1),
     ('20200801', '20:07:32.000000', '10019958', 2022, 6, 1)],
    ['date_id', 'ts', 'subs_no', 'year', 'month', 'day'])

脚本:

ts = F.to_timestamp(F.format_string('%d-%d-%d %s','year', 'month', 'day', 'ts'))
w = W.partitionBy('date_id', 'subs_no', 'year', 'month', 'day').orderBy(ts)
df = df.withColumn(
    'duration',
    F.regexp_extract(ts - F.min(ts).over(w), r'\d\d:\d\d:\d\d', 0)
)
df = df.replace('00:00:00', 'first', 'duration')

df.show()

# +--------+---------------+--------+----+-----+---+--------+

# |date_id |ts             |subs_no |year|month|day|duration|

# +--------+---------------+--------+----+-----+---+--------+

# |20200801|14:27:18.000000|10007239|2022|6    |1  |first   |

# |20200801|13:49:27.000000|10019958|2022|6    |1  |first   |

# |20200801|20:07:32.000000|10019958|2022|6    |1  |06:18:05|

# |20200801|14:29:44.000000|10054647|2022|6    |1  |first   |

# |20200801|08:24:21.000000|10057750|2022|6    |1  |first   |

# +--------+---------------+--------+----+-----+---+--------+
jjjwad0x

jjjwad0x2#

使用窗口函数。代码和逻辑如下

w=Window.partitionBy('date_id',  'subs_no', 'year', 'month').orderBy('date_id',  'subs_no', 'year', 'month')

new =(df.withColumn('ty', to_timestamp('ts'))#Coerce to timestamp.
.withColumn('duration',when(first('ty').over(w)==col('ty'),'first').otherwise(regexp_extract(col('ty')-first('ty').over(w),'\d{2}:\d{2}:\d{2}',0)))#use window functions to align consecutive tos of ts.Where ts does not change, delineate as first else compute distance nad extract time lapsed
 .drop('ty')
 .orderBy('date_id',  'ts','subs_no', 'year', 'month'))
new.show()

+--------+---------------+--------+----+-----+---+--------+
| date_id|             ts| subs_no|year|month|day|duration|
+--------+---------------+--------+----+-----+---+--------+
|20200801|08:24:21.000000|10057750|2022|    6|  1|   first|
|20200801|13:49:27.000000|10019958|2022|    6|  1|   first|
|20200801|14:27:18.000000|10007239|2022|    6|  1|   first|
|20200801|14:29:44.000000|10054647|2022|    6|  1|   first|
|20200801|20:07:32.000000|10019958|2022|    6|  1|06:18:05|
+--------+---------------+--------+----+-----+---+--------+

相关问题