pyspark Spark SQL Row_number()分区方式排序说明

mzaanser  于 2022-11-01  发布在  Spark
关注(0)|答案(5)|浏览(385)

我已经在Spark中使用Window成功创建了一个row_number()partitionBy by,但是我想按降序排序,而不是默认的升序。

from pyspark import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window

data_cooccur.select("driver", "also_item", "unit_count", 
    F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count")).alias("rowNum")).show()

结果是:

+------+---------+----------+------+
 |driver|also_item|unit_count|rowNum|
 +------+---------+----------+------+
 |   s10|      s11|         1|     1|
 |   s10|      s13|         1|     2|
 |   s10|      s17|         1|     3|

在这里我添加了desc()来降序排序:

data_cooccur.select("driver", "also_item", "unit_count", F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count").desc()).alias("rowNum")).show()

并得到以下错误:
属性错误:'WindowSpec'对象没有属性'desc'
我做错了什么?

2w2cym1i

2w2cym1i1#

desc应应用于列而不是窗口定义。您可以对列使用以下任一方法:

from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

F.row_number().over(
    Window.partitionBy("driver").orderBy(col("unit_count").desc())
)

或独立函数:

from pyspark.sql.functions import desc
from pyspark.sql.window import Window

F.row_number().over(
    Window.partitionBy("driver").orderBy(desc("unit_count"))
)
mu0hgdu0

mu0hgdu02#

或者您可以使用Spark-SQL中的SQL代码:

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .master('local[*]')\
    .appName('Test')\
    .getOrCreate()

spark.sql("""
    select driver
        ,also_item
        ,unit_count
        ,ROW_NUMBER() OVER (PARTITION BY driver ORDER BY unit_count DESC) AS rowNum
    from data_cooccur
""").show()
wh6knrhe

wh6knrhe3#

更新事实上,我试着深入调查,而且似乎不起作用。(实际上它抛出了一个错误)。它不工作的原因是我在Databricks中对display()的调用下有这段代码(display()调用之后的代码永远不会运行)。看起来 Dataframe 上的orderBy()window上的orderBy()实际上并不相同。我将保留此答案,仅用于否定确认

从PySpark 2.4开始(可能更早),只需在orderBy调用中添加关键字ascending=False就可以了。
前。
personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy("count", ascending=False)))

personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy(F.col("count").desc())))
似乎给予了我同样的行为。

fykwrbwg

fykwrbwg4#

from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

my_new_df = df.select(df["STREET NAME"]).distinct()

# Count the rows in my_new_df

print("\nThere are %d rows in the my_new_df DataFrame.\n" % my_new_df .count())

# Add a ROW_ID

my_new_df = my_new_df .withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set and sort DESC

my_new_df .orderBy(my_new_df .ROW_ID.desc()).show(10)
edqdpe6u

edqdpe6u5#

另一个类似于@zero333的col选项的选项是对列进行排序。

data_cooccur.select("driver", "also_item", "unit_count", F.rowNumber().over(Window.partitionBy("driver").orderBy(data_cooccur["unit_count"].desc())).alias("rowNum")).show()

相关问题