pyspark上的动态窗口

7uhlpewt  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(329)

我目前在pyspark上面临以下问题。我需要在行上创建一个滚动窗口,在另一行的值上应用一个函数。这个窗口通常是动态的,如果找到某个值就“重启”。我们可以在示例中看到:
“组列”“应用程序列”“输出列”12223532712223533236284210
在本例中,我们使用值1作为重新启动值,我们在application列上应用sum(但它可以是任何函数)。主要的问题是我们不知道团队的规模。正如我们在示例中看到的,大小是不同的。
我使用pandas编写了一个python代码(但我大多数使用pyspark)。然而,它需要几分钟来执行,我需要一个代码,可以执行得更好。
我的代码:
df->pyspsarkDataframe
应用列->将应用该方法的列
分组列->将用于分组值的列
输出列->输出列的名称
方法->将应用于列的方法
开始值->计数将重新开始的值

  1. def ave_like_method(df, apply_column, group_column, output_column, method, start_value=None):
  2. df = df.toPandas()
  3. if start_value is None:
  4. start_value = df[group_column].min()
  5. new_values = []
  6. index = 0
  7. iteration_values = []
  8. df[apply_column] = df[apply_column].astype(float)
  9. df[group_column] = df[group_column].astype(float)
  10. for apply, group in df[[apply_column, group_column]].values:
  11. if group == start_value:
  12. # in this case we can restart the "counting"
  13. if index != 0:
  14. # when the index is different of 0 then,
  15. # we can group the data and apply the method
  16. # applying the method over a list
  17. results = method(np.array(iteration_values))
  18. if isinstance(results, float) or isinstance(results, int):
  19. results = [results] * len(iteration_values)
  20. new_values.extend(results)
  21. iteration_values = []
  22. iteration_values.append(apply)
  23. index += 1
  24. # in the final aggregation the list does not return to it first value
  25. results = method(np.array(iteration_values))
  26. if isinstance(results, float) or isinstance(results, int):
  27. results = [results] * len(iteration_values)
  28. new_values.extend(results)
  29. print(len(new_values), len(df))
  30. df[output_column] = new_values
  31. # converting pandas dataframe to pyspawrk frame
  32. df = pandas_to_spark(df)
  33. return df

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题