pandas 如何在PySpark上过滤某个组中的最高和最低排名?

bnl4lu3b  于 2022-12-09  发布在  Spark
关注(0)|答案(2)|浏览(199)

这是我的意见

  1. id year month date hour minute rank
  2. 54807 2021 12 31 6 29 1.0
  3. 54807 2021 12 31 6 31 2.0
  4. 54807 2021 12 31 7 15 1.0
  5. 54807 2021 12 31 7 18 2.0
  6. 54807 2021 12 31 7 30 3.0

以下是Pandas代码:

  1. df.loc[
  2. df.groupby(["id", "hour"])["rank"] \
  3. .agg(["idxmin", "idxmax"]) \
  4. .stack()
  5. ].sort_index()

这是我的输出

  1. id year month date hour minute rank
  2. 54807 2021 12 31 6 29 1.0
  3. 54807 2021 12 31 6 31 2.0
  4. 54807 2021 12 31 7 15 1.0
  5. 54807 2021 12 31 7 30 3.0
ct2axkht

ct2axkht1#

Spark没有行索引。可以使用monotonically_increasing_id创建递增且唯一(但不保证连续)的ID。
然后按[“id”,“hour”]分组,并将最小和最大ID聚合为数组。
最后,按[“id”,“hour”]联接,如果ID在min-max-id-array中,则过滤记录。

  1. df = spark.createDataFrame(data=[[54807,2021,12,31,6,29,1.0],[54807,2021,12,31,6,31,2.0],[54807,2021,12,31,7,15,1.0],[54807,2021,12,31,7,18,2.0],[54807,2021,12,31,7,30,3.0]], schema=["id","year","month","date","hour","minute","rank"])
  2. import pyspark.sql.functions as F
  3. df = df.withColumn("mono_inc_id", F.monotonically_increasing_id())
  4. df_grp = df.groupBy("id", "hour") \
  5. .agg(
  6. F.array(
  7. F.min("mono_inc_id"),
  8. F.max("mono_inc_id")
  9. ).alias("min_max_id")
  10. )
  11. df_grp.show()
  12. +-----+----+----------+
  13. | id|hour|min_max_id|
  14. +-----+----+----------+
  15. |54807| 7| [2, 4]|
  16. |54807| 6| [0, 1]|
  17. +-----+----+----------+
  18. df = df.join(df_grp, on=["id", "hour"]) \
  19. .filter(F.array_contains("min_max_id", F.col("mono_inc_id"))) \
  20. .drop("mono_inc_id", "min_max_id")
  21. df.show()
  22. +-----+----+----+-----+----+------+----+
  23. | id|hour|year|month|date|minute|rank|
  24. +-----+----+----+-----+----+------+----+
  25. |54807| 7|2021| 12| 31| 15| 1.0|
  26. |54807| 7|2021| 12| 31| 30| 3.0|
  27. |54807| 6|2021| 12| 31| 29| 1.0|
  28. |54807| 6|2021| 12| 31| 31| 2.0|
  29. +-----+----+----+-----+----+------+----+
展开查看全部
wooyq4lh

wooyq4lh2#

可以使用窗口函数:

  1. df = spark.createDataFrame(
  2. [
  3. ('54807','2021','12','31','6','29','1.0')
  4. ,('54807','2021','12','31','6','31','2.0')
  5. ,('54807','2021','12','31','7','15','1.0')
  6. ,('54807','2021','12','31','7','18','2.0')
  7. ,('54807','2021','12','31','7','30','3.0')
  8. ],
  9. ['id','year','month','date','hour','minute','rank']
  10. )
  11. from pyspark.sql import Window
  12. import pyspark.sql.functions as F
  13. w = Window.partitionBy('id','hour')
  14. df\
  15. .withColumn('max_rank', F.max('rank').over(w))\
  16. .withColumn('min_rank', F.min('rank').over(w))\
  17. .filter((F.col('rank')==F.col('max_rank'))|(F.col('rank')==F.col('min_rank')))\
  18. .drop('min_rank','max_rank')\
  19. .show()
  20. # +-----+----+-----+----+----+------+----+
  21. # | id|year|month|date|hour|minute|rank|
  22. # +-----+----+-----+----+----+------+----+
  23. # |54807|2021| 12| 31| 6| 29| 1.0|
  24. # |54807|2021| 12| 31| 6| 31| 2.0|
  25. # |54807|2021| 12| 31| 7| 15| 1.0|
  26. # |54807|2021| 12| 31| 7| 30| 3.0|
  27. # +-----+----+-----+----+----+------+----+
展开查看全部

相关问题