我有数据流的元组(float,string),我想为每个时间窗口(fixed)排序并选取三个最大的值。数据流的窗口化是通过处理时间和按自然顺序排序来实现的。
使用flink1.0.1,下面是我的试用
val topTasks = new mutable.PriorityQueue[(Float, String)](Ordering.Tuple2.reverse) //Ex:(5250, "mytask")
//Get stream and other operations ...
val sortMetricStream = metricStream
.map { metrics =>
topTasks.enqueue(metrics._1, metrics._2)
}
.timeWindowAll(Time.seconds(10))
.reduce({ (topTasks.dequeue()._2, topTasks.dequeue()._2, topTasks.dequeue()._2)
})
和
val sortMetricStream = metricStream
.timeWindowAll(Time.seconds(10))
.partitionByRange(0)
.sortPartition(0, Order.DESCENDING)
在任何一个sortmetricstream中,都没有给我预期的任务名称。
在此方面的任何帮助都将不胜感激。
1条答案
按热度按时间wj8zmpe11#
使用
apply(...)
而不是reduce(...)
(见https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#datastream-转换)通过使用
WindowFunction#apply()
您可以在内部缓冲窗口的所有记录(例如在列表中),然后排序(列表),最后生成结果。你可以打电话Collector#collect()
0、1或倍数。