flink流:为每个时间窗口获取前n个元素

mnemlml8  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(325)

我有数据流的元组(float,string),我想为每个时间窗口(fixed)排序并选取三个最大的值。数据流的窗口化是通过处理时间和按自然顺序排序来实现的。
使用flink1.0.1,下面是我的试用

val topTasks = new mutable.PriorityQueue[(Float, String)](Ordering.Tuple2.reverse) //Ex:(5250, "mytask")
          //Get stream and other operations ...
          val sortMetricStream = metricStream
                       .map { metrics =>
                         topTasks.enqueue(metrics._1, metrics._2)
                       }
                       .timeWindowAll(Time.seconds(10))
                       .reduce({ (topTasks.dequeue()._2, topTasks.dequeue()._2, topTasks.dequeue()._2)
                       })

val sortMetricStream = metricStream
                       .timeWindowAll(Time.seconds(10))
                       .partitionByRange(0)
                       .sortPartition(0, Order.DESCENDING)

在任何一个sortmetricstream中,都没有给我预期的任务名称。
在此方面的任何帮助都将不胜感激。

wj8zmpe1

wj8zmpe11#

使用 apply(...) 而不是 reduce(...) (见https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#datastream-转换)
通过使用 WindowFunction#apply() 您可以在内部缓冲窗口的所有记录(例如在列表中),然后排序(列表),最后生成结果。你可以打电话 Collector#collect() 0、1或倍数。

相关问题