我目前在pyspark上面临以下问题。我需要在行上创建一个滚动窗口,在另一行的值上应用一个函数。这个窗口通常是动态的,如果找到某个值就“重启”。我们可以在示例中看到:
“组列”“应用程序列”“输出列”12223532712223533236284210
在本例中,我们使用值1作为重新启动值,我们在application列上应用sum(但它可以是任何函数)。主要的问题是我们不知道团队的规模。正如我们在示例中看到的,大小是不同的。
我使用pandas编写了一个python代码(但我大多数使用pyspark)。然而,它需要几分钟来执行,我需要一个代码,可以执行得更好。
我的代码:
df->pyspsarkDataframe
应用列->将应用该方法的列
分组列->将用于分组值的列
输出列->输出列的名称
方法->将应用于列的方法
开始值->计数将重新开始的值
def ave_like_method(df, apply_column, group_column, output_column, method, start_value=None):
df = df.toPandas()
if start_value is None:
start_value = df[group_column].min()
new_values = []
index = 0
iteration_values = []
df[apply_column] = df[apply_column].astype(float)
df[group_column] = df[group_column].astype(float)
for apply, group in df[[apply_column, group_column]].values:
if group == start_value:
# in this case we can restart the "counting"
if index != 0:
# when the index is different of 0 then,
# we can group the data and apply the method
# applying the method over a list
results = method(np.array(iteration_values))
if isinstance(results, float) or isinstance(results, int):
results = [results] * len(iteration_values)
new_values.extend(results)
iteration_values = []
iteration_values.append(apply)
index += 1
# in the final aggregation the list does not return to it first value
results = method(np.array(iteration_values))
if isinstance(results, float) or isinstance(results, int):
results = [results] * len(iteration_values)
new_values.extend(results)
print(len(new_values), len(df))
df[output_column] = new_values
# converting pandas dataframe to pyspawrk frame
df = pandas_to_spark(df)
return df
暂无答案!
目前还没有任何答案,快来回答吧!