我必须在pyspark的滑动窗口中执行聚合。特别是,我必须执行以下操作:
一次考虑100天的数据
按给定的id列分组
取聚合的最后一个值
求和并返回结果
这些任务必须在具有 .rangeBetween(-100 days, 0)
我可以通过构造一个pandas udf轻松实现这个结果,该udf将pyspark df的一些列作为输入,将它们转换为pandasDataframe,然后计算聚合并返回标量结果。然后将自定义项应用于所需的滑动窗口。
尽管这个解决方案工作得很好,但由于dfs包含数百万行,完成任务需要花费大量时间(3-4小时)。有没有办法提高这种运算的计算时间?我和pyspark一起在databricks工作。
我的自定义项是:
@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def method2(analyst: pd.Series, revisions: pd.Series) -> float:
df = pd.DataFrame({
'analyst': analyst,
'revisions': revisions
})
return df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum()
适用于:
days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)
df = df.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))
编辑:我知道这种聚合可以通过使用numpy数组来实现,而pyspark udf使用numpy结构要快得多。但是,我希望避免使用这种解决方案,因为我需要在相同的框架中应用函数,这些函数比显示的函数复杂得多,而且很难用numpy复制。
1条答案
按热度按时间4dbbbstv1#
我最近不得不实现一个类似的聚合,我的第一个尝试是使用带有滑动窗口的pandas udf。性能非常差,我通过使用以下方法来改进它。
尝试使用
collect_list
合成滑动窗口向量,然后用自定义项Map它们。请注意,只有当滑动窗口可以放入workers内存时(通常是这样)。这是我的测试代码。第一部分只是你的代码,但作为一个完整的可复制的例子。
建议替代方案:
结果: