pandas 如何在PySpark上过滤某个组中的最高和最低排名?

bnl4lu3b  于 2022-12-09  发布在  Spark
关注(0)|答案(2)|浏览(191)

这是我的意见

id      year  month date  hour  minute  rank
54807   2021     12   31     6      29  1.0
54807   2021     12   31     6      31  2.0
54807   2021     12   31     7      15  1.0
54807   2021     12   31     7      18  2.0
54807   2021     12   31     7      30  3.0

以下是Pandas代码:

df.loc[
   df.groupby(["id", "hour"])["rank"] \
     .agg(["idxmin", "idxmax"]) \
     .stack()
].sort_index()

这是我的输出

id      year  month date  hour  minute  rank
54807   2021     12   31     6      29  1.0
54807   2021     12   31     6      31  2.0
54807   2021     12   31     7      15  1.0
54807   2021     12   31     7      30  3.0
ct2axkht

ct2axkht1#

Spark没有行索引。可以使用monotonically_increasing_id创建递增且唯一(但不保证连续)的ID。
然后按[“id”,“hour”]分组,并将最小和最大ID聚合为数组。
最后,按[“id”,“hour”]联接,如果ID在min-max-id-array中,则过滤记录。

df = spark.createDataFrame(data=[[54807,2021,12,31,6,29,1.0],[54807,2021,12,31,6,31,2.0],[54807,2021,12,31,7,15,1.0],[54807,2021,12,31,7,18,2.0],[54807,2021,12,31,7,30,3.0]], schema=["id","year","month","date","hour","minute","rank"])

import pyspark.sql.functions as F
df = df.withColumn("mono_inc_id", F.monotonically_increasing_id())

df_grp = df.groupBy("id", "hour") \
       .agg(
            F.array(
                    F.min("mono_inc_id"), 
                    F.max("mono_inc_id")
            ).alias("min_max_id")
       )
df_grp.show()
+-----+----+----------+
|   id|hour|min_max_id|
+-----+----+----------+
|54807|   7|    [2, 4]|
|54807|   6|    [0, 1]|
+-----+----+----------+

df = df.join(df_grp, on=["id", "hour"]) \
       .filter(F.array_contains("min_max_id", F.col("mono_inc_id"))) \
       .drop("mono_inc_id", "min_max_id")

df.show()
+-----+----+----+-----+----+------+----+
|   id|hour|year|month|date|minute|rank|
+-----+----+----+-----+----+------+----+
|54807|   7|2021|   12|  31|    15| 1.0|
|54807|   7|2021|   12|  31|    30| 3.0|
|54807|   6|2021|   12|  31|    29| 1.0|
|54807|   6|2021|   12|  31|    31| 2.0|
+-----+----+----+-----+----+------+----+
wooyq4lh

wooyq4lh2#

可以使用窗口函数:

df = spark.createDataFrame(
    [
     ('54807','2021','12','31','6','29','1.0')
    ,('54807','2021','12','31','6','31','2.0')
    ,('54807','2021','12','31','7','15','1.0')
    ,('54807','2021','12','31','7','18','2.0')
    ,('54807','2021','12','31','7','30','3.0')
    ],
    ['id','year','month','date','hour','minute','rank']
)

from pyspark.sql import Window
import pyspark.sql.functions as F

w = Window.partitionBy('id','hour')

df\
    .withColumn('max_rank', F.max('rank').over(w))\
    .withColumn('min_rank', F.min('rank').over(w))\
    .filter((F.col('rank')==F.col('max_rank'))|(F.col('rank')==F.col('min_rank')))\
    .drop('min_rank','max_rank')\
    .show()

# +-----+----+-----+----+----+------+----+
# |   id|year|month|date|hour|minute|rank|
# +-----+----+-----+----+----+------+----+
# |54807|2021|   12|  31|   6|    29| 1.0|
# |54807|2021|   12|  31|   6|    31| 2.0|
# |54807|2021|   12|  31|   7|    15| 1.0|
# |54807|2021|   12|  31|   7|    30| 3.0|
# +-----+----+-----+----+----+------+----+

相关问题