我有一个python函数,我想适应pyspark。我对pyspark还很陌生,所以找到一个实现这个的方法——不管是使用udf还是在pyspark中实现,都是一个挑战。
本质上,它执行一系列 numpy
按Dataframe分组的计算。我不完全确定在Pypark中这样做的最佳方式
python代码:
data = [
[1, "a", 10, 23, 33],
[1, "b", 11, 25, 34],
[1, "c", 12, 35, 35],
[1, "d", 13, 40, 36],
[2, "e", 14, 56, 38],
[2, "g", 14, 56, 39],
[2, "g", 16, 40, 38],
[2, "g", 19, 87, 90],
[3, "a", 20, 12, 90],
[3, "a", 21, 45, 80],
[3, "b", 21, 45, 38],
[3, "c", 12, 45, 67],
[3, "d", 18, 45, 78],
[3, "d", 12, 78, 90],
[3, "d", 8, 85, 87],
[3, "d", 19, 87, 89],
]
df = pd.DataFrame(data, columns=["id", "sub_id", "sub_sub_id", "value_1", "value_2"])
df
grouped_df = df.groupby(["id", "sub_id", "sub_sub_id"])
aggregated_df = grouped_df.agg(
{
"value_1": ["mean", "std"],
"value_2": ["mean", "std"],
}
).reset_index()
for value in ["value_1", "value_2"]:
aggregated_df[f"{value}_calc"] = np.maximum(
aggregated_df[value]["mean"]
- grouped_df[value].min().values,
grouped_df[value].max().values
- aggregated_df[value]["mean"],
)
我想表演一个 Window
与已经分组和聚合的sparkDataframe一起运行,但我非常确定这不是最好的方法。
test = aggregated_sdf.withColumn(
"new_calculated_value",
spark_fns.max(
spark_fns.expr(
"ave_value_1" - spark_fns.min(spark_fns.collect_list("ave_value_1"))
),
(
spark_fns.expr(
spark_fns.max(spark_fns.collect_list("ave_value_1")) - "ave_value_1"
)
),
).over(Window.partitionBy("id", "sub_id", "sub_sub_id"))
1条答案
按热度按时间ylamdve61#
您可以尝试在聚合期间进行计算,类似于您在pandas代码中所做的。相当于
np.maximum
应该是F.greatest
.F.max
是一个聚合函数,它获取列中的最大值,而F.greatest
不是聚合函数,并获取一行中最多几列的值。