生成的Dataframe如下:
df.groupBy($"Hour", $"Category")
.agg(sum($"value") as "TotalValue")
.sort($"Hour".asc, $"TotalValue".desc))
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
如您所见,Dataframe是按 Hour
以递增的顺序,然后 TotalValue
按降序排列。
我想选择每组的第一行,即。
从小时组==0中选择(0,cat26,30.9)
从小时组==1中选择(1,cat67,28.5)
从小时组==2中选择(2,cat56,39.6)
等等
所以期望的输出是:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 1| cat67| 28.5|
| 2| cat56| 39.6|
| 3| cat8| 35.6|
| ...| ...| ...|
+----+--------+----------+
也可以方便地选择每组的前n行。
非常感谢您的帮助。
8条答案
按热度按时间qni6mghb1#
使用dataframeapi的一个好方法是使用argmax逻辑,如下所示
nhjlsmyf2#
下面的解决方案只执行一次groupby并在一次快照中提取包含maxvalue的Dataframe行。不需要进一步的连接或窗口。
ppcbkaq53#
你可以这样做-
xesrikrc4#
模式是按键分组=>对每个组执行一些操作,例如reduce=>返回Dataframe
在这种情况下,我认为Dataframe抽象有点麻烦,所以我使用了rdd功能
zed5wv105#
这与zero323的答案完全相同,但采用sql查询方式。
假设dataframe被创建并注册为
窗口功能:
纯sql聚合,后跟联接:
使用结构上的排序:
数据集的方式和不dos与原始答案相同
vvppvyoh6#
窗口功能:
像这样的事情应该可以做到:
这种方法在数据严重倾斜的情况下效率很低。
纯sql聚合,后跟
join
:或者,您可以加入聚合Dataframe:
它将保留重复的值(如果每小时有多个类别具有相同的总值)。您可以按如下方式删除它们:
使用排序结束
structs
:整洁,虽然没有很好的测试,技巧不需要连接或窗口函数:
使用数据集api(spark 1.6+、2.0+):
Spark1.6:
spark 2.0或更高版本:
最后两种方法可以利用map-side-combine,并且不需要完全洗牌,因此大多数情况下,与窗口函数和连接相比,应该表现出更好的性能。这些命令也可用于结构化流式输入
completed
输出模式。不要使用:
这似乎是可行的(尤其是在
local
模式),但它是不可靠的(见Spark-16207,信贷tzach-zohar链接相关的jira问题,和Spark-30335)。同样的注解也适用于
内部使用等效的执行计划。
9rnv2umw7#
对于按多列分组的spark 2.0.2:
jyztefdp8#
我们可以使用rank()窗口函数(您可以选择rank=1)rank只为组的每一行添加一个数字(在本例中是小时)
举个例子从https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank )