scala spark使用窗口函数查找最大值

k0pti3hp  于 2021-05-22  发布在  Spark
关注(0)|答案(2)|浏览(612)

我的数据集如下所示:

  1. +------------------------|-----+
  2. | timestamp| zone|
  3. +------------------------+-----+
  4. | 2019-01-01 00:05:00 | A|
  5. | 2019-01-01 00:05:00 | A|
  6. | 2019-01-01 00:05:00 | B|
  7. | 2019-01-01 01:05:00 | C|
  8. | 2019-01-01 02:05:00 | B|
  9. | 2019-01-01 02:05:00 | B|
  10. +------------------------+-----+

每小时我都需要计算哪个区域的行数最多,最后得到一个如下所示的表:

  1. +-----|-----+-----+
  2. | hour| zone| max |
  3. +-----+-----+-----+
  4. | 0| A| 2|
  5. | 1| C| 1|
  6. | 2| B| 2|
  7. +-----+-----+-----+

我的指令说,我需要使用窗口功能和“分组方式”来找到我的最大计数。
我试过一些方法,但我不确定是否接近。任何帮助都将不胜感激。

vm0i2vca

vm0i2vca1#

你可以用 Windowing functions 以及 group by 使用Dataframe。
在你的情况下,你可以 rank() over(partition by) 窗口功能。

  1. import org.apache.spark.sql.function._
  2. // first group by hour and zone
  3. val df_group = data_tms.
  4. select(hour(col("timestamp")).as("hour"), col("zone"))
  5. .groupBy(col("hour"), col("zone"))
  6. .agg(count("zone").as("max"))
  7. // second rank by hour order by max in descending order
  8. val df_rank = df_group.
  9. select(col("hour"),
  10. col("zone"),
  11. col("max"),
  12. rank().over(Window.partitionBy(col("hour")).orderBy(col("max").desc)).as("rank"))
  13. // filter by col rank = 1
  14. df_rank
  15. .select(col("hour"),
  16. col("zone"),
  17. col("max"))
  18. .where(col("rank") === 1)
  19. .orderBy(col("hour"))
  20. .show()
  21. /*
  22. +----+----+---+
  23. |hour|zone|max|
  24. +----+----+---+
  25. | 0| A| 2|
  26. | 1| C| 1|
  27. | 2| B| 2|
  28. +----+----+---+
  29. * /
展开查看全部
rbl8hiat

rbl8hiat2#

您可以使用两个后续窗口函数来获得结果:

  1. df
  2. .withColumn("hour",hour($"timestamp"))
  3. .withColumn("cnt",count("*").over(Window.partitionBy($"hour",$"zone")))
  4. .withColumn("rnb",row_number().over(Window.partitionBy($"hour").orderBy($"cnt".desc)))
  5. .where($"rnb"===1)
  6. .select($"hour",$"zone",$"cnt".as("max"))

相关问题