我有一条小溪
+------+-------------------+------+
|group | time| label|
+------+-------------------+------+
| a|2020-01-01 10:49:00|red |
| a|2020-01-01 10:51:00|yellow|
| a|2020-01-01 12:49:00|blue |
| b|2020-01-01 12:44:00|red |
| b|2020-01-01 12:46:00|blue |
| c|2020-01-01 12:46:00|green |
+------+-------------------+------+
我想使用spark流来为每个组保留最近的时间。
对于sparkDataframe,我将使用窗口函数作为
val window = {
Window
.partitionBy("group")
.orderBy($"time".desc)
}
df
.withColumn("rn",row_number.over(window))
.filter("rn = 1")
.drop("rn")
.show()
或者
df
.orderBy($"time".desc)
.dropDuplicates("group")
在spark streaming中执行相同操作的最佳方法是什么?如何以只存储最新解决方案的方式保存结果?
更新:我试图保持每个组只有一行与最近的时间。有没有可能将有状态转换用于 mapGroupsWithState
为了这个目的?
1条答案
按热度按时间6l7fqoea1#
在spark结构化流媒体中进行聚合时,需要首先定义
Window
. 通过此窗口操作,您可以定义计算聚合的时间间隔(“最大时间,按列“group”分组”)。假设您计划在5分钟(非滑动)窗口内获得最大时间,那么您将定义:
需要注意的是,最大值上的聚合只适用于数值,因此,转换为
unix_timestamp
如上图所示。根据输出模式,您可以选择
update
模式仅获取组的更新。确保您的输出接收器(如控制台或数据库)能够处理更新,而不是创建重复项。