计算PySpark DataFrame列的模式?

y1aodyip  于 2023-05-06  发布在  Spark
关注(0)|答案(7)|浏览(190)

最后,我需要的是列的模式,适用于DataFrame中的所有列。对于其他汇总统计数据,我看到了几个选项:使用DataFrame聚合,或者将DataFrame的列Map到向量的RDD(我在做这件事时也遇到了麻烦),并使用MLlib中的colStats。但我不认为时尚是一个选择。

rt4zxlrg

rt4zxlrg1#

众数的问题与中位数的问题几乎相同。虽然它很容易计算,但计算是相当昂贵的。它可以使用排序,然后使用本地和全局聚合,或者使用just-another-wordcount和filter来完成:

import numpy as np
np.random.seed(1)

df = sc.parallelize([
    (int(x), ) for x in np.random.randint(50, size=10000)
]).toDF(["x"])

cnts = df.groupBy("x").count()
mode = cnts.join(
    cnts.agg(max("count").alias("max_")), col("count") == col("max_")
).limit(1).select("x")
mode.first()[0]
## 0

无论哪种方式,都可能需要对每列进行完全 Shuffle 。

jw5wzhpr

jw5wzhpr2#

这一行将给予spark Dataframe df中“col”的模式:

df.groupby("col").count().orderBy("count", ascending=False).first()[0]

对于df中所有列的模式列表,请使用:

[df.groupby(i).count().orderBy("count", ascending=False).first()[0] for i in df.columns]

要添加名称以标识哪列的模式,请创建2D列表:

[[i,df.groupby(i).count().orderBy("count", ascending=False).first()[0]] for i in df.columns]
7fyelxc5

7fyelxc53#

下面的方法可以帮助您获取输入 Dataframe 的所有列的模式

from pyspark.sql.functions import monotonically_increasing_id

def get_mode(df):
    column_lst = df.columns
    res = [df.select(i).groupby(i).count().orderBy("count", ascending=False) for i in column_lst]
    df_mode = res[0].limit(1).select(column_lst[0]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
    
    for i in range(1, len(res)):
        df2 = res[i].limit(1).select(column_lst[i]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
        df_mode = df_mode.join(df2, (df_mode.temp_name_monotonically_increasing_id == df2.temp_name_monotonically_increasing_id)).drop(df2.temp_name_monotonically_increasing_id)
        
    return df_mode.drop("temp_name_monotonically_increasing_id")
a64a0gku

a64a0gku4#

您可以使用Java代码计算列模式,如下所示:

case MODE:
                Dataset<Row> cnts = ds.groupBy(column).count();
                Dataset<Row> dsMode = cnts.join(
                        cnts.agg(functions.max("count").alias("max_")),
                        functions.col("count").equalTo(functions.col("max_")
                        ));
                Dataset<Row> mode = dsMode.limit(1).select(column);
                replaceValue = ((GenericRowWithSchema) mode.first()).values()[0];
                ds = replaceWithValue(ds, column, replaceValue);
                break;

private static Dataset<Row> replaceWithValue(Dataset<Row> ds, String column, Object replaceValue) {
    return ds.withColumn(column,
            functions.coalesce(functions.col(column), functions.lit(replaceValue)));
}
x6yk4ghg

x6yk4ghg5#

>>> df=newdata.groupBy('columnName').count()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]

See My result

>>> newdata.groupBy('var210').count().show()
+------+-----+
|var210|count|
+------+-----+
|  3av_|   64|
|  7A3j|  509|
|  g5HH| 1489|
|  oT7d|  109|
|  DM_V|  149|
|  uKAI|44883|
+------+-----+

# store the above result in df
>>> df=newdata.groupBy('var210').count()
>>> df.orderBy(df['count'].desc()).collect()
[Row(var210='uKAI', count=44883),
Row(var210='g5HH', count=1489),
Row(var210='7A3j', count=509),
Row(var210='DM_V', count=149),
Row(var210='oT7d', count=109),
Row(var210='3av_', count=64)]

# get the first value using collect()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]
>>> mode
'uKAI'

使用groupBy()函数获取列中每个类别的计数。df是我的结果 Dataframe ,有两列var210,count。使用orderBy(),列名为'count',按降序给予数据框第一行的最大值。collect()[0][0]用于获取 Dataframe 中的1元组

zd287kbt

zd287kbt6#

首先按列按count分组(我没有计算空值),并获得最大的count值(频繁值)。第二,查找最大计数值的键:

from pysprak.sql import functions as F

count_mode_val = df.groupBy("column_name").count().filter(F.col("column_name").isNotNull()).agg(F.max("count")).collect()[0][0]

mode_val = df.groupBy("column_name").count().filter(F.col("column_name").isNotNull()).filter(F.col("count") == count_mode_val).select("column_name").collect()[0][0]
zbwhf8kr

zbwhf8kr7#

使用UDF,因为它简单且不太复杂:-

它将适用于Categorical和Numeric数据类型

from pyspark.sql.functions import col, udf, collect_list
import statistics

# define a UDF to calculate mode
def mode_udf(data):
    if len(data) == 0:
        return None
    
    return statistics.mode(data)    # similar for mean, median.

# register the UDF
mode_func = udf(mode_udf)

# create a sample dataframe
data = [("apple", 1), ("orange", 2), ("apple", 2), ("banana", 4), ("orange", 12), ("orange", 2), ("apple", 3), ("apple", 0), ("apple", 3),("apple", 2), ("apple", 2), ("banana", 7), ("banana", 4)]
df = spark.createDataFrame(data, ["fruit", "quantity"])

# calculate the mode for the "fruit" column
mode_df = df.groupBy("fruit").agg(mode_func(collect_list("quantity")).alias("quantity_mode"))

# show the result
mode_df.show()

注意:-请处理数据中的None/Null值,否则有可能获得意外输出。

相关问题