spark流:保留组中的最新值

rkttyhzu  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(512)

我有一条小溪

+------+-------------------+------+
|group |               time| label|
+------+-------------------+------+
|     a|2020-01-01 10:49:00|red   |
|     a|2020-01-01 10:51:00|yellow|
|     a|2020-01-01 12:49:00|blue  |
|     b|2020-01-01 12:44:00|red   |
|     b|2020-01-01 12:46:00|blue  |
|     c|2020-01-01 12:46:00|green |
+------+-------------------+------+

我想使用spark流来为每个组保留最近的时间。
对于sparkDataframe,我将使用窗口函数作为

val window = {
    Window
    .partitionBy("group")
    .orderBy($"time".desc)
}

df
.withColumn("rn",row_number.over(window))
.filter("rn = 1")
.drop("rn")
.show()

或者

df
.orderBy($"time".desc)
.dropDuplicates("group")

在spark streaming中执行相同操作的最佳方法是什么?如何以只存储最新解决方案的方式保存结果?
更新:我试图保持每个组只有一行与最近的时间。有没有可能将有状态转换用于 mapGroupsWithState 为了这个目的?

6l7fqoea

6l7fqoea1#

在spark结构化流媒体中进行聚合时,需要首先定义 Window . 通过此窗口操作,您可以定义计算聚合的时间间隔(“最大时间,按列“group”分组”)。
假设您计划在5分钟(非滑动)窗口内获得最大时间,那么您将定义:

val df = spark.readStream
  .format("kafka")
  [...]
  .selectExpr("CAST(value AS STRING) as group", "timestamp")

val dfGrouped = df
  .select(
    col("group"),
    col("timestamp"),
    unix_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss").alias("time_unix"))
  .groupBy(col("group"), window($"timestamp", "5 minutes"))
  .agg(max("time_unix").alias("max_time_unix"))
  .withColumn("time", col("max_time_unix").cast(TimestampType))
  .drop("window", "max_time_unix")

需要注意的是,最大值上的聚合只适用于数值,因此,转换为 unix_timestamp 如上图所示。
根据输出模式,您可以选择 update 模式仅获取组的更新。确保您的输出接收器(如控制台或数据库)能够处理更新,而不是创建重复项。

相关问题