如何用pyspark中的平均值替换异常值?

xlpyo6sf  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(451)

我想知道如何用平均值代替离群值。我有dataframe,我可以找到异常值并过滤行,现在我想用平均值替换它。我该怎么做?
df类似于:

a     b
1      27    0
2      10    1
3      80    2
4      21    3
5      46    4
6      100   5

在找到iqr后,我得到了如下异常值:

Upper = 75
lower = 12
outliers = df.filter((df['a'] > upper) | (df['a'] < lower))
2      10    1
3      80    2
6      100   5

现在我发现平均值:

from pyspark.sql.functions import mean as _mean, col
mean= df.select(_mean(col('a')).alias('mean')).collect()
mean = mean[0]['mean']
mean : 31.333

现在我不明白如何将均值四舍五入到31,并用pyspark中的异常值替换它。

kdfy810k

kdfy810k1#

你可以用 when 使用给定条件替换异常值。要替换为平均值,可以使用 mean 窗口函数,而不是将其收集到一个变量中,并使用 F.round :

from pyspark.sql import functions as F, Window

upper = 75
lower = 12

df2 = df.withColumn(
    'a', 
    F.when(
        (df['a'] > upper) | (df['a'] < lower), 
        F.round(F.mean('a').over(Window.orderBy(F.lit(1)))).cast('int')
        # or you can use 
        # F.round(F.lit(df.select(F.mean(F.col('a')).alias('mean')).collect()[0]['mean'])).cast('int')
    ).otherwise(F.col('a'))
)

df2.show()
+---+---+
|  a|  b|
+---+---+
| 27|  0|
| 47|  1|
| 47|  2|
| 21|  3|
| 46|  4|
| 47|  5|
+---+---+

相关问题