如何在pyspark中动态聚合列

jobtbby3  于 2022-12-11  发布在  Spark
关注(0)|答案(2)|浏览(143)

我想计算每个输入列的非缺失值pct_<original_name>_valid的百分比。在本例中只有2列,因此很容易手动编写下面的代码。但是当有30多列时,我不想手动执行此操作。是否可以动态执行此操作?(例如,将列名列表作为输入)

import pyspark.sql.functions as F

d = [{'name': 'Alice', 'age': 1}, {'name': 'Bae', 'age': None}]
df = spark.createDataFrame(d)

df.withColumn('name_valid', F.when(col("name").isNotNull(),1).otherwise(0))\
.withColumn('age_valid', F.when(col("age").isNotNull(),1).otherwise(0))\
.agg(
    (100.0*F.sum(col("name_valid"))/F.count(F.lit(1))).alias("pct_name_valid"),
    (100.0*F.sum(col("age_valid"))/F.count(F.lit(1))).alias("pct_age_valid")
)\
.show()

结果如下:

+--------------+-------------+
|pct_name_valid|pct_age_valid|
+--------------+-------------+
|         100.0|         50.0|
+--------------+-------------+

如前所述,我不想手动为所有30多列执行此操作。是否有任何方法可以执行以下操作:

my_output = calculate_non_missing_percentage(df, my_columns = ["name", "age", "gender", "school", "color"])
yqkkidmi

yqkkidmi1#

您可以使用数据行的名称来动态汇总数据行。

cols = df.columns

# transform null values in 0, else 1
df = df.select(
    *(
        F.when(
            F.col(col).isNull(),
            0
        ).otherwise(1).alias(col)
        for col
        in cols
    )
)

# percentage of non-missing value
df = df.agg(
    *(
        (F.sum(col)/F.count(col)).alias('{}_ratio'.format(col))
        for col
        in cols
    )
)

df.show()                                                                                                       
+---------+----------+
|age_ratio|name_ratio|
+---------+----------+
|      0.5|       1.0|
+---------+----------+
ecfsfe2w

ecfsfe2w2#

在代码中动态查找空值的方法如下:

from pyspark.sql.functions import isnan, when, count

total_count = df.count()
null_values = df.select(
    [(count(when(isnan(c), c)) / total_count).alias(c) for c in df.columns]
)

# Another way to do it is (ref neobot)
null_values = df.select(
    [(sum(when(isnull(c), 1).otherwise(0)) / total_count).alias(c) for c in df.columns]
)

诀窍在于事先创建列表。列出要应用于列的函数,然后将列表传递给select。
我使用这个函数来计算数据中的非重复值:
df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns)(假设数据行是字串数据行,此处未放置该条件)

相关问题