基于pyspark中的分类列值计数创建具有运行计数的新列

9ceoxa92  于 2023-06-21  发布在  Spark
关注(0)|答案(2)|浏览(99)

假设一个给定的dataframe:
| 型号|颜色|
| - -----|- -----|
| 汽车|红色|
| 汽车|红色|
| 汽车|蓝色|
| 卡车|红色|
| 卡车|蓝色|
| 卡车|黄色|
| SUV|蓝色|
| SUV|蓝色|
| 汽车|蓝色|
| 汽车|黄色|
我想添加颜色列,以保持每个模型中每种颜色的计数,以给予以下 Dataframe :
| 型号|颜色|红色|蓝色|黄色|
| - -----|- -----|- -----|- -----|- -----|
| 汽车|红色|2| 2| 1|
| 汽车|红色|2| 2| 1|
| 汽车|蓝色|2| 2| 1|
| 卡车|红色|1| 1| 1|
| 卡车|蓝色|1| 1| 1|
| 卡车|黄色|1| 1| 1|
| SUV|蓝色|0| 2| 0|
| SUV|蓝色|0| 2| 0|
| 汽车|蓝色|2| 2| 1|
| 汽车|黄色|2| 2| 1|
这个数据集有数十亿条记录,所以我尽量远离UDF,如果可能的话,我更喜欢使用内置的方法。
我通常使用带有.size()和.collect_set()的窗口函数来计算这种类型的数据,但是基于不同的列类别添加多个不同的新df列会给我带来问题,因为我不确定是否需要通过添加额外的窗口分区或.where()或isin()方法来隔离单个类别。任何反馈或建议都很感激。谢谢你。

1hdlvixo

1hdlvixo1#

让我们用window函数和内置的PySparkDataFrame函数来做这件事,window在计算上可能非常昂贵,特别是对于大数据集,所以也许可以为你的方法寻找一种更好的方法。不要忘记将datadf替换为实际的dataDataFrame

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum as _sum

spark = SparkSession.builder.getOrCreate()

data = [("Car","Red"), ("Car","Red"), ("Car","Blue"), ("Truck","Red"), 
        ("Truck","Blue"), ("Truck","Yellow"), ("SUV","Blue"), 
        ("SUV","Blue"), ("Car","Blue"), ("Car","Yellow")]

df = spark.createDataFrame(data, ["Model", "Color"])

window_model = Window.partitionBy('Model')

df = df.withColumn('Red', _sum((col('Color') == 'Red').cast('int')).over(window_model))
df = df.withColumn('Blue', _sum((col('Color') == 'Blue').cast('int')).over(window_model))
df = df.withColumn('Yellow', _sum((col('Color') == 'Yellow').cast('int')).over(window_model))

df.show()
e0bqpujr

e0bqpujr2#

如果您不关心保留原始顺序,可以使用一行程序完成:

data = [("Car","Red"), ("Car","Red"), ("Car","Blue"), ("Truck","Red"), 
        ("Truck","Blue"), ("Truck","Yellow"), ("SUV","Blue"), 
        ("SUV","Blue"), ("Car","Blue"), ("Car","Yellow")]

df = spark.createDataFrame(data, ["Model", "Color"])

df.join(df.groupBy("Model").pivot("Color").count().fillna(0), on='Model').show()

# +-----+------+----+---+------+
# |Model| Color|Blue|Red|Yellow|
# +-----+------+----+---+------+
# |  SUV|  Blue|   2|  0|     0|
# |  SUV|  Blue|   2|  0|     0|
# |  Car|   Red|   2|  2|     1|
# |  Car|   Red|   2|  2|     1|
# |  Car|  Blue|   2|  2|     1|
# |  Car|  Blue|   2|  2|     1|
# |  Car|Yellow|   2|  2|     1|
# |Truck|   Red|   1|  1|     1|
# |Truck|  Blue|   1|  1|     1|
# |Truck|Yellow|   1|  1|     1|
# +-----+------+----+---+------+

如果您关心顺序,它仍然可以作为一行程序完成:

import pyspark.sql.functions as F

df.withColumn("_id", F.monotonically_increasing_id())\
    .join(df.groupBy("Model").pivot("Color").count().fillna(0), on='Model')\
    .orderBy("_id").drop("_id").show()

# +-----+------+----+---+------+
# |Model| Color|Blue|Red|Yellow|
# +-----+------+----+---+------+
# |  Car|   Red|   2|  2|     1|
# |  Car|   Red|   2|  2|     1|
# |  Car|  Blue|   2|  2|     1|
# |Truck|   Red|   1|  1|     1|
# |Truck|  Blue|   1|  1|     1|
# |Truck|Yellow|   1|  1|     1|
# |  SUV|  Blue|   2|  0|     0|
# |  SUV|  Blue|   2|  0|     0|
# |  Car|  Blue|   2|  2|     1|
# |  Car|Yellow|   2|  2|     1|
# +-----+------+----+---+------+

相关问题