groupby列和筛选pyspark中具有最大值的行

nbysray5  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(735)

我几乎可以肯定这个问题以前被问过,但是通过stackoverflow搜索并没有回答我的问题。不是[2]的副本,因为我想要的是最大值,而不是最频繁的项。我是pyspark的新手,尝试做一些非常简单的事情:我想按列“a”分组,然后只保留列“b”中每个组的最大值行。这样地:

df_cleaned = df.groupBy("A").agg(F.max("B"))

不幸的是,这会丢弃所有其他列-dfïu cleaned只包含列“a”和最大值b。如何保留行?(“a、b、c……)

nhjlsmyf

nhjlsmyf1#

另一种可能的方法是应用joinDataframe,并将其自身指定为“leftsemi”。这种联接在左侧包含Dataframe中的所有列,在右侧不包含任何列。
例如:

import pyspark.sql.functions as f
data = [
    ('a', 5, 'c'),
    ('a', 8, 'd'),
    ('a', 7, 'e'),
    ('b', 1, 'f'),
    ('b', 3, 'g')
]
df = sqlContext.createDataFrame(data, ["A", "B", "C"])
df.show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  5|  c|
|  a|  8|  d|
|  a|  7|  e|
|  b|  1|  f|
|  b|  3|  g|
+---+---+---+

通过执行以下操作可以选择b列和a列的最大值:

df.groupBy('A').agg(f.max('B')
+---+---+
|  A|  B|
+---+---+
|  a|  8|
|  b|  3|
+---+---+

将此表达式用作左半联接中的右侧,并重命名获得的列 max(B) 回到原来的名字 B ,我们可以得到所需的结果:

df.join(df.groupBy('A').agg(f.max('B').alias('B')),on='B',how='leftsemi').show()
+---+---+---+
|  B|  A|  C|
+---+---+---+
|  3|  b|  g|
|  8|  a|  d|
+---+---+---+

这个解决方案背后的物理方案和公认答案中的物理方案是不同的,我仍然不清楚哪一个方案在大型Dataframe上性能更好。
使用spark sql语法可以获得相同的结果,方法如下:

df.registerTempTable('table')
q = '''SELECT *
FROM table a LEFT SEMI
JOIN (
    SELECT 
        A,
        max(B) as max_B
    FROM table
    GROUP BY A
    ) t
ON a.A=t.A AND a.B=t.max_B
'''
sqlContext.sql(q).show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  b|  3|  g|
|  a|  8|  d|
+---+---+---+
jum4pzuy

jum4pzuy2#

你不需要一个 udf 使用 Window .
考虑以下示例:

import pyspark.sql.functions as f
data = [
    ('a', 5),
    ('a', 8),
    ('a', 7),
    ('b', 1),
    ('b', 3)
]
df = sqlCtx.createDataFrame(data, ["A", "B"])
df.show()

# +---+---+

# |  A|  B|

# +---+---+

# |  a|  5|

# |  a|  8|

# |  a|  7|

# |  b|  1|

# |  b|  3|

# +---+---+

创建 Window 按列划分 A 用这个来计算每组的最大值。然后过滤出行,使列中的值 B 等于最大值。

from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))\
    .where(f.col('B') == f.col('maxB'))\
    .drop('maxB')\
    .show()

# +---+---+

# |  A|  B|

# +---+---+

# |  a|  8|

# |  b|  3|

# +---+---+

或等效使用 pyspark-sql :

df.registerTempTable('table')
q = "SELECT A, B FROM (SELECT *, MAX(B) OVER (PARTITION BY A) AS maxB FROM table) M WHERE B = maxB"
sqlCtx.sql(q).show()

# +---+---+

# |  A|  B|

# +---+---+

# |  b|  3|

# |  a|  8|

# +---+---+

相关问题