pyspark groupby和consolidatng在多个不同的列值上

ux6nzvsh  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(518)

尝试为a列和b列(如下)的不同值提取具有最新日期的记录

理想结果:

当前解决方案:

from pyspark.sql import functions as f
test = df.groupBy(df['A'], df['B']).agg(f.first(df['C']), f.first(df['D']), f.max(df['E']))

有没有漏洞可以寻找或建议优化以上?

qij5mzcb

qij5mzcb1#

您可以对e列进行降序排序,然后使用 row_number 函数只提取最新数据。

df.show()

# +---+---+----+---+--------+

# |  A|  B|   C|  D|       E|

# +---+---+----+---+--------+

# | 12|ERP|1000|  M|20200130|

# | 12|ERP|2000|  M|20200228|

# | 12|ERP|7500|  D|20200330|

# | 12|ERF|4500|  D|20200430|

# | 12|ERF|4000|  L|20200228|

# | 12|ERF|3400|  L|20200330|

# +---+---+----+---+--------+

from pyspark.sql.functions import *
from pyspark.sql import *

w=Window.partitionBy("A","B").orderBy(col("Z").desc())

df.withColumn("z",to_date(col("E"),"yyyyMMdd")).\
withColumn("rn",row_number().over(w)).\
filter(col("rn") == 1).\
drop(*['z','rn']).\
show()

# +---+---+----+---+--------+

# |  A|  B|   C|  D|       E|

# +---+---+----+---+--------+

# | 12|ERP|7500|  D|20200330|

# | 12|ERF|4500|  D|20200430|

# +---+---+----+---+--------+
z5btuh9x

z5btuh9x2#

使用 Window 函数 maxdate 列并使用它 filter .

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("A","B")

df1.withColumn("max", F.max(F.to_date("E","yyyyMMdd")).over(w))\
  .filter(F.to_date(F.col("E"),"yyyyMMdd")==F.col("max")).drop("max").show()

如果 E(date) 列为 not of StringType ,使用此选项:

w=Window().partitionBy("A","B")

df.withColumn("max", F.max(F.to_date(F.col("E").cast('string'),"yyyyMMdd")).over(w))\
  .filter(F.to_date(F.col("E").cast('string'),"yyyyMMdd")==F.col("max")).drop("max").show()

输出:


# +---+---+----+---+--------+

# |  A|  B|   C|  D|       E|

# +---+---+----+---+--------+

# | 12|ERP|7500|  D|20200330|

# | 12|ERF|4500|  D|20200430|

# +---+---+----+---+--------+

相关问题