Stream wordCountsStream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
new Count(), new Fields("count")).parallelismHint(16).newValuesStream();
1条答案
按热度按时间monwx1rj1#
tridentapi提供max&maxby操作,这些操作在trident流中一批元组的每个分区上返回最大值。
因此,在计算每个单词的计数后,如下所示:
使用maxby获取具有最大计数的单词: