python—通过条件pyspark将列中的多个值相互相减

k0pti3hp  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(683)

我正在为一些我认为应该很琐碎的事情而挣扎。
我有一个输入Dataframe:
idmonthsfeature\ AFFEATURE\ BFFEATURE\ c11213163242121326324
现在我想按id分组,并用月份为6的值减去月份为1的值。产生如下输出Dataframe:
I特征\U A特征\U B特征\U c11112111
现在,我使用以下代码来实现这一点:

def get_month_diff(df, start_month=1, end_month=6):

    columns_to_agg = ['feature_a', 'feature_b', 'feature_c']
    result = (df
               .groupby('id')
               .pivot('months')
               .agg(*[F.sum(col).alias(f'{col}') for col in columns_to_agg])
               )
    #  Pyspark doesnt work nice with columns that have '.'s in them
    result = result.toDF(*(c.replace('.', '_') for c in result.columns))

    for col in columns_to_agg:
        result = result.withColumn(col, result[f"{end_month}_0_{col}"] - result[f"{start_month}_0_{col}"])

    return result

我不喜欢这样,我必须从一列中减去另一列,然后在第一个spark变换之外创建这些新列。所以我在寻找解决办法。
因此,我问是否有人可以帮助我在正确的方向上解决这个问题?

v8wbuo2f

v8wbuo2f1#

如果你使用 maxwhen 要获取第1个月和第6个月的值:

import pyspark.sql.functions as F

df2 = df.groupBy('id').agg(*[
    (
        F.max(F.when(F.col('months') == 6, F.col(c))) - 
        F.max(F.when(F.col('months') == 1, F.col(c)))
    ).alias(c) 
    for c in df.columns[2:]
])

df2.show()
+---+---------+---------+---------+
| id|feature_a|feature_b|feature_c|
+---+---------+---------+---------+
|  1|        1|        1|        1|
|  2|        1|        1|        1|
+---+---------+---------+---------+

相关问题