如何从pyspark分区的内存中删除唯一的id?

qoefvg9y  于 9个月前  发布在  Apache
关注(0)|答案(1)|浏览(83)

enter image description here
上面是pyspark数据框架。我需要添加一个新列-“new_col”,其中我需要应用下面的转换-
1.当差值大于0时,则选择与组B分区内最后一个差值列中的值一样多的row_id
1.当差值小于0时,则在组A分区内从开始处选择与差值列中的值一样多的row_id
1.当difference为0时,则选择所有row_id
例如,如果差值为2,则需要从组B的最后一个值即4,5中选取值。类似地,如果差值为-2,则需要从组A的idx值即1,2中选取值
nested_window = Window().partitionBy('Team Name').orderBy('row_id')df = df.withColumn("new_col", F.when(df['difference'] > 0, F.last(df['row_id']).over(nested_window))) )
我只得到了最后一个值,而不是期望差值>0时的多个值

zlwx9yxi

zlwx9yxi1#

我试过Window(),但无法得到多个值。所以,使用UDF,你可以像下面这样实现你的要求。

样本起始数据:

首先,使用group by并将row_id的排序数组作为一个新列并将其存储在一个新的嵌套框架中。

from pyspark.sql.types import *
from pyspark.sql.functions import col, sort_array,collect_list

#Create another dataframe with required row_id list
df2=df.groupBy("Team Name").agg(collect_list("row_id").alias("new_col")).withColumn("new_col",sort_array("new_col"))

#Join it with our dataframe
Joined_df = df.join(df2,  'Team Name',  'inner')
Joined_df.show()

它将给予像下面这样的框架。

现在,为case条件创建一个UDF,并像下面这样从框架中调用它。

# Function to select values based on 'difference' returns the required array
@udf(ArrayType(IntegerType()))
def get_values(arr, diff):
    if diff > 0:
        return arr[-diff:]
    elif diff < 0:
        return arr[:abs(diff)]
    return arr

# Get the required row_id array using the udf
Joined_df.withColumn("new_col", get_values(Joined_df["new_col"], Joined_df["difference"])).show()

结果:

相关问题