我目前正在使用烫伤进行mapreduce工作。我试图根据在typedpipe中的行中看到特定值的次数来设置阈值。例如,如果在typedpipe中有以下行:
第1列|第2列
“嗨”|“嗨”
“你好”|“你好”
“你好”|“你好”
“再见”|“再见”
我想在每行后面加上我在每行的第1列和第2列看到的频率值。这意味着输出将如下所示:
第1列|第2列|第1列频率|第2列频率
“嗨”|“嗨”| 3 | 1
“你好”|“你好”| 3 | 2
“你好”|“你好”| 3 | 2
“再见”|“再见”| 1 | 1
目前,我是通过按每列对类型化管道进行分组来实现的,如下所示:
val key2Freqs = input.groupBy('key2) {
_.size('key2Freq)
}.rename('key2 -> 'key2Right).project('key2Right, 'key2Freq);
然后将原始输入与key2freq连接,如下所示:
.joinWithSmaller('key2 -> 'key2Right, key2Freqs, joiner = new LeftJoin)
然而,这是真的很慢,在我看来是相当低效的基本上是一个相当简单的任务。它变得特别长b/c我有6个不同的键,我想得到这些值,我目前正在Map和加入6个不同的时间在我的工作。一定有更好的办法吧?
1条答案
按热度按时间hmtdttj41#
如果每列中不同值的数目足够小,可以将它们全部放入内存中,则可以
.map
将您的列添加到Map[String,Int]
,然后.groupAll.sum
为了一次性地计算它们(我使用的是“类型化api”表示法,不太记得在字段api中是如何实现的,但是您已经明白了这一点)。你需要使用MapMonoid
从algebird开始,或者你自己写,如果你不想为这件事添加依赖项,这并不难。然后您将得到一个管道,其中包含一个用于生成结果的条目Map
. 现在,你可以得到你原来的管道,并做.crossWithTiny
把带计数的Map带进去,然后.map
提取单个计数。否则,如果你不能把这些都记在记忆里,那么你现在做什么似乎是唯一的办法。。。除非你真的在寻找一个“顶级杀手”的近似值,而不是整个宇宙的精确计数。。。在这种情况下,请查看algebird的草图。