如何在pyspark中将函数转换为自定义项?

8wigbo56  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(322)

我有一个python函数,我想适应pyspark。我对pyspark还很陌生,所以找到一个实现这个的方法——不管是使用udf还是在pyspark中实现,都是一个挑战。
本质上,它执行一系列 numpy 按Dataframe分组的计算。我不完全确定在Pypark中这样做的最佳方式
python代码:

data = [
    [1, "a", 10, 23, 33],
    [1, "b", 11, 25, 34],
    [1, "c", 12, 35, 35],
    [1, "d", 13, 40, 36],
    [2, "e", 14, 56, 38],
    [2, "g", 14, 56, 39],
    [2, "g", 16, 40, 38],
    [2, "g", 19, 87, 90],
    [3, "a", 20, 12, 90],
    [3, "a", 21, 45, 80],
    [3, "b", 21, 45, 38],
    [3, "c", 12, 45, 67],
    [3, "d", 18, 45, 78],
    [3, "d", 12, 78, 90],
    [3, "d", 8, 85, 87],
    [3, "d", 19, 87, 89],
]
df = pd.DataFrame(data, columns=["id", "sub_id", "sub_sub_id", "value_1", "value_2"])
df
grouped_df = df.groupby(["id", "sub_id", "sub_sub_id"])

aggregated_df = grouped_df.agg(
            {
                "value_1": ["mean", "std"],
                "value_2": ["mean", "std"],
            }
        ).reset_index()
for value in ["value_1", "value_2"]:
    aggregated_df[f"{value}_calc"] = np.maximum(
        aggregated_df[value]["mean"]
        - grouped_df[value].min().values,
        grouped_df[value].max().values
        - aggregated_df[value]["mean"],
    )

我想表演一个 Window 与已经分组和聚合的sparkDataframe一起运行,但我非常确定这不是最好的方法。

test = aggregated_sdf.withColumn(
    "new_calculated_value",
    spark_fns.max(
        spark_fns.expr(
            "ave_value_1" - spark_fns.min(spark_fns.collect_list("ave_value_1"))
        ),
        (
            spark_fns.expr(
                spark_fns.max(spark_fns.collect_list("ave_value_1")) - "ave_value_1"
            )
        ),
    ).over(Window.partitionBy("id", "sub_id", "sub_sub_id"))
ylamdve6

ylamdve61#

您可以尝试在聚合期间进行计算,类似于您在pandas代码中所做的。相当于 np.maximum 应该是 F.greatest . F.max 是一个聚合函数,它获取列中的最大值,而 F.greatest 不是聚合函数,并获取一行中最多几列的值。

import pyspark.sql.functions as F

df2 = df.groupby("id", "sub_id", "sub_sub_id").agg(
    F.mean('value_1').alias('ave_value_1'), 
    F.mean('value_2').alias('ave_value_2'), 
    F.greatest(
        F.mean('value_1') - F.min('value_1'), 
        F.max('value_1') - F.mean('value_1')
    ).alias('value_1_calc'), 
    F.greatest(
        F.mean('value_2') - F.min('value_2'), 
        F.max('value_2') - F.mean('value_2')
    ).alias('value_2_calc')
)

相关问题