pyspark:过滤联合的重复项,只保留groupby行中指定列的最大值

jgzswidk  于 2021-05-17  发布在  Spark
关注(0)|答案(2)|浏览(788)

我想创建一个dataframe,它包含两个dataframe中的所有行,如果有重复项,我们只保留一个列的最大值的行。
例如,如果我们有两个具有相同模式的表,如下面所示,我们将合并到一个表中,该表只包含由另一列分组的行组中具有最大列值(最高分数)的行(在下面的示例中为“name”)。

  1. Table A
  2. +--------------------------+
  3. | name | source | score |
  4. +--------+---------+-------+
  5. | Finch | Acme | 62 |
  6. | Jones | Acme | 30 |
  7. | Lewis | Acme | 59 |
  8. | Smith | Acme | 98 |
  9. | Starr | Acme | 87 |
  10. +--------+---------+-------+
  11. Table B
  12. +--------------------------+
  13. | name | source | score |
  14. +--------+---------+-------+
  15. | Bryan | Beta | 93 |
  16. | Jones | Beta | 75 |
  17. | Lewis | Beta | 59 |
  18. | Smith | Beta | 64 |
  19. | Starr | Beta | 81 |
  20. +--------+---------+-------+
  21. Final Table
  22. +--------------------------+
  23. | name | source | score |
  24. +--------+---------+-------+
  25. | Bryan | Beta | 93 |
  26. | Finch | Acme | 62 |
  27. | Jones | Beta | 75 |
  28. | Lewis | Acme | 59 |
  29. | Smith | Acme | 98 |
  30. | Starr | Acme | 87 |
  31. +--------+---------+-------+

以下是似乎有效的方法:

  1. from pyspark.sql import functions as F
  2. schema = ["name", "source", "score"]
  3. rows1 = [("Smith", "Acme", 98),
  4. ("Jones", "Acme", 30),
  5. ("Finch", "Acme", 62),
  6. ("Lewis", "Acme", 59),
  7. ("Starr", "Acme", 87)]
  8. rows2 = [("Smith", "Beta", 64),
  9. ("Jones", "Beta", 75),
  10. ("Bryan", "Beta", 93),
  11. ("Lewis", "Beta", 59),
  12. ("Starr", "Beta", 81)]
  13. df1 = spark.createDataFrame(rows1, schema)
  14. df2 = spark.createDataFrame(rows2, schema)
  15. df_union = df1.unionAll(df2)
  16. df_agg = df_union.groupBy("name").agg(F.max("score").alias("score"))
  17. df_final = df_union.join(df_agg, on="score", how="leftsemi").orderBy("name", F.col("score").desc()).dropDuplicates(["name"])

以上结果是我所期望的Dataframe。这似乎是一个复杂的方式来做这件事,但我不知道,因为我是相对新的Spark。这能以一种更有效、更优雅或更“Python式”的方式来实现吗?

2nc8po8w

2nc8po8w1#

我看不出你的答案有什么错,除了最后一行——你不能只在分数上加入,但需要在“name”和“score”的组合上加入,你可以选择inner join,这样就不需要删除同名分数较低的行:

  1. df_final = (df_union.join(df_agg, on=["name", "score"], how="inner")
  2. .orderBy("name")
  3. .dropDuplicates(["name"]))

请注意,不需要按分数排序,只有当您希望避免为name=lewis显示两行时才需要.dropduplicates([“name”]),因为name=lewis在两个Dataframe中的分数相同。

flvlnr44

flvlnr442#

可以使用窗口函数。分区依据 name 选择最高的记录 score .

  1. from pyspark.sql.functions import *
  2. from pyspark.sql.window import Window
  3. w=Window().partitionBy("name").orderBy(desc("score"))
  4. df_union.withColumn("rank", row_number().over(w))\
  5. .filter(col("rank")==1).drop("rank").show()
  6. +-----+------+-----+
  7. | name|source|score|
  8. +-----+------+-----+
  9. |Bryan| Beta| 93|
  10. |Finch| Acme| 62|
  11. |Jones| Beta| 75|
  12. |Lewis| Acme| 59|
  13. |Smith| Acme| 98|
  14. |Starr| Acme| 87|
  15. +-----+------+-----+
展开查看全部

相关问题