在pysparkDataframe中将重叠间隔列表拆分为不重叠的子间隔

xqk2d5yq  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(300)

我有一个包含列的pysparkDataframe start_time , end_time 定义每行的间隔。
有一列 rate ,我想知道子间隔是否有不同的值(定义上是重叠的);如果是这样的话,我想把最后的记录作为基本事实。
输入:


# So this:

input_rows = [Row(start_time='2018-01-01 00:00:00', end_time='2018-01-04 00:00:00', rate=10),  # OVERLAP: (1,4) and (2,3) and (3,5) and rate=10/20          
              Row(start_time='2018-01-02 00:00:00', end_time='2018-01-03 00:00:00', rate=10),  # OVERLAP: full overlap for (2,3) with (1,4)               
              Row(start_time='2018-01-03 00:00:00', end_time='2018-01-05 00:00:00', rate=20),  # OVERLAP: (3,5) and (1,4) and rate=10/20                          
              Row(start_time='2018-01-06 00:00:00', end_time='2018-01-07 00:00:00', rate=30),  # NO OVERLAP: hole between (5,6)                                            
              Row(start_time='2018-01-07 00:00:00', end_time='2018-01-08 00:00:00', rate=30)]  # NO OVERLAP

df = spark.createDataFrame(input_rows)
df.show()
>>> +-------------------+-------------------+----+
    |         start_time|           end_time|rate|
    +-------------------+-------------------+----+
    |2018-01-01 00:00:00|2018-01-04 00:00:00|  10|
    |2018-01-02 00:00:00|2018-01-03 00:00:00|  10|
    |2018-01-03 00:00:00|2018-01-05 00:00:00|  20|
    |2018-01-06 00:00:00|2018-01-07 00:00:00|  30|
    |2018-01-07 00:00:00|2018-01-08 00:00:00|  30|
    +-------------------+-------------------+----+

# To give you:

output_rows = [Row(start_time='2018-01-01 00:00:00', end_time='2018-01-02 00:00:00', rate=10),
               Row(start_time='2018-01-02 00:00:00', end_time='2018-01-03 00:00:00', rate=10),
               Row(start_time='2018-01-03 00:00:00', end_time='2018-01-04 00:00:00', rate=20),
               Row(start_time='2018-01-04 00:00:00', end_time='2018-01-05 00:00:00', rate=20),
               Row(start_time='2018-01-06 00:00:00', end_time='2018-01-07 00:00:00', rate=30),
               Row(start_time='2018-01-07 00:00:00', end_time='2018-01-08 00:00:00', rate=30)
              ]
final_df = spark.createDataFrame(output_rows)
final_df.show()
>>> 
+-------------------+-------------------+----+
|         start_time|           end_time|rate|
+-------------------+-------------------+----+
|2018-01-01 00:00:00|2018-01-02 00:00:00|  10|
|2018-01-02 00:00:00|2018-01-03 00:00:00|  10|
|2018-01-03 00:00:00|2018-01-04 00:00:00|  20|
|2018-01-04 00:00:00|2018-01-05 00:00:00|  20|
|2018-01-06 00:00:00|2018-01-07 00:00:00|  30|
|2018-01-07 00:00:00|2018-01-08 00:00:00|  30|
+-------------------+-------------------+----+
wsxa1bj1

wsxa1bj11#

您可以将结束时间与下一个开始时间进行比较,如果下一个开始时间小于结束时间,则用下一个开始时间替换结束时间。

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'end_time2', 
    F.min('start_time').over(
        Window.orderBy('start_time')
              .rowsBetween(1, Window.unboundedFollowing)
    )
).select(
    'start_time',
    F.when(
        F.col('end_time2') < F.col('end_time'), 
        F.col('end_time2')
    ).otherwise(
        F.col('end_time')
    ).alias('end_time'),
    'rate'
)

df2.show()
+-------------------+-------------------+----+
|         start_time|           end_time|rate|
+-------------------+-------------------+----+
|2018-01-01 00:00:00|2018-01-02 00:00:00|  10|
|2018-01-02 00:00:00|2018-01-03 00:00:00|  10|
|2018-01-03 00:00:00|2018-01-05 00:00:00|  20|
|2018-01-06 00:00:00|2018-01-07 00:00:00|  30|
|2018-01-07 00:00:00|2018-01-08 00:00:00|  30|
+-------------------+-------------------+----+

相关问题