我已经在Spark中使用Window成功创建了一个row_number()
partitionBy
by,但是我想按降序排序,而不是默认的升序。
from pyspark import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
data_cooccur.select("driver", "also_item", "unit_count",
F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count")).alias("rowNum")).show()
结果是:
+------+---------+----------+------+
|driver|also_item|unit_count|rowNum|
+------+---------+----------+------+
| s10| s11| 1| 1|
| s10| s13| 1| 2|
| s10| s17| 1| 3|
在这里我添加了desc()来降序排序:
data_cooccur.select("driver", "also_item", "unit_count", F.rowNumber().over(Window.partitionBy("driver").orderBy("unit_count").desc()).alias("rowNum")).show()
并得到以下错误:
属性错误:'WindowSpec'对象没有属性'desc'
我做错了什么?
5条答案
按热度按时间2w2cym1i1#
desc
应应用于列而不是窗口定义。您可以对列使用以下任一方法:或独立函数:
mu0hgdu02#
或者您可以使用Spark-SQL中的SQL代码:
wh6knrhe3#
更新事实上,我试着深入调查,而且似乎不起作用。(实际上它抛出了一个错误)。它不工作的原因是我在Databricks中对
display()
的调用下有这段代码(display()
调用之后的代码永远不会运行)。看起来 Dataframe 上的orderBy()
和window
上的orderBy()
实际上并不相同。我将保留此答案,仅用于否定确认从PySpark 2.4开始(可能更早),只需在
orderBy
调用中添加关键字ascending=False
就可以了。前。
personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy("count", ascending=False)))
和
personal_recos.withColumn("row_number", F.row_number().over(Window.partitionBy("COLLECTOR_NUMBER").orderBy(F.col("count").desc())))
似乎给予了我同样的行为。
fykwrbwg4#
edqdpe6u5#
另一个类似于@zero333的
col
选项的选项是对列进行排序。