我有一个包含列的pysparkDataframe start_time
, end_time
定义每行的间隔。它还包含一列 is_duplicated
设置为 True
如果一个间隔被至少另一个间隔重叠;设置为 False
如果没有。
有一列 rate
,我想知道子间隔是否有不同的值(定义上是重叠的);如果是这样,我想保留包含列中最新更新的记录 updated_at
作为基本事实。
在中间步骤中,我想创建一个列 is_validated
设置为: None
子间隔不重叠时 True
当子间隔被另一个包含不同子间隔的子间隔重叠时 rate
值,并且是最后更新的 False
当子间隔被另一个包含不同子间隔的子间隔重叠时 rate
并且不是最后更新的
注意:中间步骤不是强制性的,我提供它只是为了让解释更清楚。
输入:
# So this:
input_rows = [Row(start_time='2018-01-01 00:00:00', end_time='2018-01-04 00:00:00', rate=10, updated_at='2021-02-25 00:00:00'), # OVERLAP: (1,4) and (2,3) and (3,5) and rate=10/20
Row(start_time='2018-01-02 00:00:00', end_time='2018-01-03 00:00:00', rate=10, updated_at='2021-02-25 00:00:00'), # OVERLAP: full overlap for (2,3) with (1,4)
Row(start_time='2018-01-03 00:00:00', end_time='2018-01-05 00:00:00', rate=20, updated_at='2021-02-20 00:00:00'), # OVERLAP: (3,5) and (1,4) and rate=10/20
Row(start_time='2018-01-06 00:00:00', end_time='2018-01-07 00:00:00', rate=30, updated_at='2021-02-25 00:00:00'), # NO OVERLAP: hole between (5,6)
Row(start_time='2018-01-07 00:00:00', end_time='2018-01-08 00:00:00', rate=30, updated_at='2021-02-25 00:00:00')] # NO OVERLAP
df = spark.createDataFrame(input_rows)
df.show()
>>> +-------------------+-------------------+----+-------------------+
| start_time| end_time|rate| updated_at|
+-------------------+-------------------+----+-------------------+
|2018-01-01 00:00:00|2018-01-04 00:00:00| 10|2021-02-25 00:00:00|
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|2021-02-25 00:00:00|
|2018-01-03 00:00:00|2018-01-05 00:00:00| 20|2021-02-20 00:00:00|
|2018-01-06 00:00:00|2018-01-07 00:00:00| 30|2021-02-25 00:00:00|
|2018-01-07 00:00:00|2018-01-08 00:00:00| 30|2021-02-25 00:00:00|
+-------------------+-------------------+----+-------------------+
# Will become:
tmp_rows = [Row(start_time='2018-01-01 00:00:00', end_time='2018-01-02 00:00:00', rate=10, updated_at='2021-02-25 00:00:00', is_duplicated=False, is_validated=None),
Row(start_time='2018-01-02 00:00:00', end_time='2018-01-03 00:00:00', rate=10, updated_at='2021-02-25 00:00:00', is_duplicated=True, is_validated=True),
Row(start_time='2018-01-02 00:00:00', end_time='2018-01-03 00:00:00', rate=10, updated_at='2021-02-25 00:00:00', is_duplicated=True, is_validated=True),
Row(start_time='2018-01-03 00:00:00', end_time='2018-01-04 00:00:00', rate=10, updated_at='2021-02-20 00:00:00', is_duplicated=True, is_validated=False),
Row(start_time='2018-01-03 00:00:00', end_time='2018-01-04 00:00:00', rate=20, updated_at='2021-02-25 00:00:00', is_duplicated=True, is_validated=True),
Row(start_time='2018-01-04 00:00:00', end_time='2018-01-05 00:00:00', rate=20, updated_at='2021-02-25 00:00:00', is_duplicated=False, is_validated=None),
Row(start_time='2018-01-06 00:00:00', end_time='2018-01-07 00:00:00', rate=30, updated_at='2021-02-25 00:00:00', is_duplicated=False, is_validated=None),
Row(start_time='2018-01-07 00:00:00', end_time='2018-01-08 00:00:00', rate=30, updated_at='2021-02-25 00:00:00', is_duplicated=False, is_validated=None)
]
tmp_df = spark.createDataFrame(tmp_rows)
tmp_df.show()
>>>
+-------------------+-------------------+----+-------------------+-------------+------------+
| start_time| end_time|rate| updated_at|is_duplicated|is_validated|
+-------------------+-------------------+----+-------------------+-------------+------------+
|2018-01-01 00:00:00|2018-01-02 00:00:00| 10|2021-02-25 00:00:00| false| null|
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|2021-02-25 00:00:00| true| true|
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|2021-02-25 00:00:00| true| true|
|2018-01-03 00:00:00|2018-01-04 00:00:00| 10|2021-02-20 00:00:00| true| false|
|2018-01-03 00:00:00|2018-01-04 00:00:00| 20|2021-02-25 00:00:00| true| true|
|2018-01-04 00:00:00|2018-01-05 00:00:00| 20|2021-02-25 00:00:00| false| null|
|2018-01-06 00:00:00|2018-01-07 00:00:00| 30|2021-02-25 00:00:00| false| null|
|2018-01-07 00:00:00|2018-01-08 00:00:00| 30|2021-02-25 00:00:00| false| null|
+-------------------+-------------------+----+-------------------+-------------+------------+
# To give you:
output_rows = [Row(start_time='2018-01-01 00:00:00', end_time='2018-01-02 00:00:00', rate=10),
Row(start_time='2018-01-02 00:00:00', end_time='2018-01-03 00:00:00', rate=10),
Row(start_time='2018-01-03 00:00:00', end_time='2018-01-04 00:00:00', rate=20),
Row(start_time='2018-01-04 00:00:00', end_time='2018-01-05 00:00:00', rate=20),
Row(start_time='2018-01-06 00:00:00', end_time='2018-01-07 00:00:00', rate=30),
Row(start_time='2018-01-07 00:00:00', end_time='2018-01-08 00:00:00', rate=30)
]
final_df = spark.createDataFrame(output_rows)
final_df.show()
>>>
+-------------------+-------------------+----+
| start_time| end_time|rate|
+-------------------+-------------------+----+
|2018-01-01 00:00:00|2018-01-02 00:00:00| 10|
|2018-01-02 00:00:00|2018-01-03 00:00:00| 10|
|2018-01-03 00:00:00|2018-01-04 00:00:00| 10|
|2018-01-04 00:00:00|2018-01-05 00:00:00| 20|
|2018-01-06 00:00:00|2018-01-07 00:00:00| 30|
|2018-01-07 00:00:00|2018-01-08 00:00:00| 30|
+-------------------+-------------------+----+
2条答案
按热度按时间093gszye1#
这样做有效:
h9a6wy2h2#
您可以分解时间戳序列,就像您的中间Dataframe一样,然后按开始时间和结束时间分组,以根据更新时间获得最新速率。