在pyspark中,我有一个类似于以下示例的框架:
id, execution_time, sym, qty
========================================
1, 2023-10-27 15:01:24.2200, aa1, 100
2, 2023-10-27 15:15:21.2200, aa1, 250
3, 2023-10-27 15:27:24.2200, aa2, 350
4, 2023-10-27 15:35:25.2200, aa3, 400
5, 2023-10-27 16:00.25.2200, aa3, 500
6, 2023-10-27 16:15:24.2200, aa4, 100
7, 2023-10-27 16:55:24.2200, aa1, 100
8, 2023-10-27 16:50:24.2200, aa2, 100
========================================
字符串
现在我的要求是:我有一个'duration'变量,这个变量的值是30 #分钟现在从第一行开始,我需要应用duration变量的值,然后我需要像下面这样对这些行进行分组-所以,在这个样本数据中,在应用'duration'变量之后,我应该可以分组到第三行。因为第四行的时间大于第一行+持续时间。(我们在第一行应用了持续时间)
现在我需要再次从第4行开始并应用duration变量,这次我们应该只对第4行和第5行进行分组,因为第6行的时间大于第4行+ duration。
现在我需要再次从第6行开始并应用持续时间变量,这次我们应该只对第6行进行分组,因为第7行的时间大于第6行+持续时间。
换句话说:因此,在对一行的time列应用duration之后(假设这是我们的结果),我们需要选择下一行的time > result的所有即将到来的行,然后选择下一行并应用duration。
是否可以标记所有这些行,并将其存储在一个新列中,这福尔斯符合上述条件?因为稍后我需要进行聚合。
2条答案
按热度按时间y53ybaqx1#
我试着用简单的方式做这件事。
1.首先创建一个Window spec,这样我们就可以收集所有属于window_spec的id。
1.获取这些收集的id列表的计数并将其存储在列中。
1.提取出id并计数到一个单独的列表中进行顺序迭代处理,因为它很复杂,不能直接在Window函数中实现。
1.根据需要处理提取的元组列表。
1.从上面处理的列表中创建一个框架。
下面是python脚本:
字符串
输出量:
型
k3fezbri2#
你可以收集所有的日期行作为一个结构体列表,然后使用
aggregate
函数来匹配结束时间。字符串
aggregate
函数的工作方式类似于python的reduce
。它递归地将逻辑应用于数组的元素。在这种情况下,我不断地用exec_time
检查结束日期,如果exec_time
大于当前结束日期,则使用exec_time
来计算结束日期。型