前n项的map reduce

dfty9e19  于 2021-07-13  发布在  Hadoop
关注(0)|答案(1)|浏览(341)

我正在用java开发hadoop项目,遇到了一些困难。我理解我应该做的事情的目标,但是我真的不知道如何实现它。我试图从map reduce作业中提取前n个结果,例如前5个最高频率值。
我知道这通常需要两个map reduce,一个用于reduce,另一个用于对值进行排序。然而,正如我所说,我对如何真正实现这一点相当迷茫。
我使用的代码是一个相当标准的map reduce代码,带有一些针对特殊值的过滤。

  1. public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
  2. {
  3. private Text wordToken = new Text();
  4. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
  5. {
  6. StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']"); //Dividing String into tokens
  7. while (tokens.hasMoreTokens())
  8. {
  9. wordToken.set(tokens.nextToken());
  10. context.write(wordToken, new IntWritable(1));
  11. }
  12. }
  13. }

减速机

  1. public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
  2. {
  3. private IntWritable count = new IntWritable();
  4. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
  5. {
  6. int valueSum = 0;
  7. for (IntWritable val : values)
  8. {
  9. valueSum += val.get();
  10. }
  11. count.set(valueSum);
  12. context.write(key, count);
  13. }
  14. }

司机

  1. public class WordCount {
  2. public static void main(String[] args) throws Exception
  3. {
  4. Configuration conf = new Configuration();
  5. String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  6. if (pathArgs.length < 2)
  7. {
  8. System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
  9. System.exit(2);
  10. }
  11. Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
  12. wcJob.setJarByClass(WordCount.class);
  13. wcJob.setMapperClass(MapWordCount.class);
  14. wcJob.setCombinerClass(ReduceWordCount.class);
  15. wcJob.setReducerClass(ReduceWordCount.class);
  16. wcJob.setOutputKeyClass(Text.class);
  17. wcJob.setOutputValueClass(IntWritable.class);
  18. for (int i = 0; i < pathArgs.length - 1; ++i)
  19. {
  20. FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
  21. }
  22. FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
  23. System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
  24. }
  25. }

如果有人能帮我这个忙,我将不胜感激。正如我所说,我知道我需要两个Map缩小,但不太确定如何从这个开始。我尝试了在stackoverflow上找到的几个其他解决方案,但对我的案例来说运气不太好。非常感谢!

shyt4zoc

shyt4zoc1#

您的确是对的,您确实需要将两个mapreduce作业链接在一起。更具体地说,您需要:
一个任务是计算输入文档中存储的每个单词的字数,
一个任务是能够“排序”所有这些单词和字数,以便选择和输出顶部 N 他们中的一个。
第一项工作与您已经提出的工作非常相似,因此我将重点介绍第二项工作,以便更清楚地了解topn在mapreduce范式中的工作方式。
把topn mr作业看作一个独立的东西,我们知道这个特定的作业将接收一堆键值对,其中最后一步的每个单词都将是键,它的wordcount将是值。因为Map器和还原器是 map 以及 reduce 函数并行运行时,我们需要找到一种方法,首先在本地找到topn字(即对于每个Map器),然后对所有这些本地topn结果进行分组,以找到输入给应用程序的所有数据的“全局”topn字。
所以 TopNMapper 首先要创建一个 TreeMap (一种java键值数据结构,在内部按键对元素进行排序) setup 函数(所以在创建Map器示例之前),每个Map器将初始化它的一个对象,并将每个单词及其wordcount作为元素。对于这种类型的计算(topn),我们将wordcount作为关键字,word作为值,以获得单词的升序排序列表。因为我们只需要找出最上面的 N 在这里的话,可以肯定地说,我们只想要顶部 N 每个Map器的单词,所以我们可以删除下面的所有其他元素,并有一个 TreeMapN 元素,在Map程序执行结束时(即通过 cleanup 函数)。Map器将编写键值对,其中单词将成为键,而单词计数将成为值,如下所示: <word, wordcount> 现在是 TopNReducer ,我们需要使用 TreeMap 要用所有本地topn元素填充它,请删除不是top元素的元素 N 然后写下这些单词,它们的单词作为输出。为了更“干净”,我们可以“反转”关键字-值对结构中的单词和单词计数,这样我们就可以将单词计数作为关键字,将单词作为值。这将导致按(升序)排序的键值对数量,这些键值对将在完成此作业后存储在磁盘中,如下所示: wordcount, word> 在2个mr jobs中可以做这样一件事的程序如下(我们在这里设置 N 作为一个全球性的 Configuration 内部价值 main 函数与 conf.set("N", "10"); 命令,并在 setup 委员会的职能 TopNMapper 以及 TopNReducer 类),所有类都放在一个类中 TopNWordCount 为简单起见:

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.FileSystem;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import org.apache.hadoop.mapreduce.Reducer;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11. import org.apache.hadoop.util.GenericOptionsParser;
  12. import java.io.IOException;
  13. import java.util.Map;
  14. import java.util.TreeMap;
  15. public class TopNWordCount
  16. {
  17. /* input: <document, contents>
  18. * output: <word, 1>
  19. */
  20. public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>
  21. {
  22. private final static IntWritable one = new IntWritable(1);
  23. public void map(Object key, Text value, Context context) throws IOException, InterruptedException
  24. {
  25. // clean up the document text and split the words into an array
  26. String[] words = value.toString()
  27. .replaceAll("\\d+", "") // get rid of numbers...
  28. .replaceAll("[^a-zA-Z ]", " ") // get rid of punctuation...
  29. .toLowerCase() // turn every letter to lowercase...
  30. .trim() // trim the spaces
  31. .replaceAll("\\s+", " ")
  32. .split(" ");
  33. // write every word as key with `1` as value that indicates that the word is
  34. // found at least 1 time inside the input text
  35. for(String word : words)
  36. context.write(new Text(word), one);
  37. }
  38. }
  39. /* input: <word, 1>
  40. * output: <word, wordcount>
  41. */
  42. public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
  43. {
  44. private IntWritable wordcount = new IntWritable();
  45. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
  46. {
  47. int word_cnt = 0;
  48. for(IntWritable value : values)
  49. word_cnt += value.get();
  50. wordcount.set(word_cnt);
  51. context.write(key, wordcount);
  52. }
  53. }
  54. /* input: <word, wordcount>
  55. * output: <NULL, (word, wordcount)> (with the local topN words)
  56. */
  57. public static class TopNMapper extends Mapper<Object, Text, Text, IntWritable>
  58. {
  59. private int n; // the N of TopN
  60. private TreeMap<Integer, String> word_list; // local list with words sorted by their frequency
  61. public void setup(Context context)
  62. {
  63. n = Integer.parseInt(context.getConfiguration().get("N")); // get N
  64. word_list = new TreeMap<Integer, String>();
  65. }
  66. public void map(Object key, Text value, Context context)
  67. {
  68. String[] line = value.toString().split("\t"); // split the word and the wordcount
  69. // put the wordcount as key and the word as value in the word list
  70. // so the words can be sorted by their wordcounts
  71. word_list.put(Integer.valueOf(line[1]), line[0]);
  72. // if the local word list is populated with more than N elements
  73. // remove the first (aka remove the word with the smallest wordcount)
  74. if (word_list.size() > n)
  75. word_list.remove(word_list.firstKey());
  76. }
  77. public void cleanup(Context context) throws IOException, InterruptedException
  78. {
  79. // write the topN local words before continuing to TopNReducer
  80. // with each word as key and its wordcount as value
  81. for (Map.Entry<Integer, String> entry : word_list.entrySet())
  82. {
  83. context.write(new Text(entry.getValue()), new IntWritable(entry.getKey()));
  84. }
  85. }
  86. }
  87. /* input: <word, wordcount> (with the local topN words)
  88. * output: <wordcount, word> (with the global topN words)
  89. */
  90. public static class TopNReducer extends Reducer<Text, IntWritable, IntWritable, Text>
  91. {
  92. private int n; // the N of TopN
  93. private TreeMap<Integer, String> word_list; // list with words globally sorted by their frequency
  94. public void setup(Context context)
  95. {
  96. n = Integer.parseInt(context.getConfiguration().get("N")); // get N
  97. word_list = new TreeMap<Integer, String>();
  98. }
  99. public void reduce(Text key, Iterable<IntWritable> values, Context context)
  100. {
  101. int wordcount = 0;
  102. // get the one and only value (aka the wordcount) for each word
  103. for(IntWritable value : values)
  104. wordcount = value.get();
  105. // put the wordcount as key and the word as value in the word list
  106. // so the words can be sorted by their wordcounts
  107. word_list.put(wordcount, key.toString());
  108. // if the global word list is populated with more than N elements
  109. // remove the first (aka remove the word with the smallest wordcount)
  110. if (word_list.size() > n)
  111. word_list.remove(word_list.firstKey());
  112. }
  113. public void cleanup(Context context) throws IOException, InterruptedException
  114. {
  115. // write the topN global words with each word as key and its wordcount as value
  116. // so the output will be sorted by the wordcount
  117. for (Map.Entry<Integer, String> entry : word_list.entrySet())
  118. {
  119. context.write(new IntWritable(entry.getKey()), new Text(entry.getValue()));
  120. }
  121. }
  122. }
  123. public static void main(String[] args) throws Exception
  124. {
  125. Configuration conf = new Configuration();
  126. String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  127. conf.set("N", "10"); // set the N as a "public" value in the current Configuration
  128. if (pathArgs.length < 2)
  129. {
  130. System.err.println("MR Project Usage: TopNWordCount <input-path> [...] <output-path>");
  131. System.exit(2);
  132. }
  133. Path wordcount_dir = new Path("wordcount");
  134. Path output_dir = new Path(pathArgs[pathArgs.length - 1]);
  135. // if the in-between and output directories exists, delete them
  136. FileSystem fs = FileSystem.get(conf);
  137. if(fs.exists(wordcount_dir))
  138. fs.delete(wordcount_dir, true);
  139. if(fs.exists(output_dir))
  140. fs.delete(output_dir, true);
  141. Job wc_job = Job.getInstance(conf, "WordCount");
  142. wc_job.setJarByClass(TopNWordCount.class);
  143. wc_job.setMapperClass(WordCountMapper.class);
  144. wc_job.setReducerClass(WordCountReducer.class);
  145. wc_job.setMapOutputKeyClass(Text.class);
  146. wc_job.setMapOutputValueClass(IntWritable.class);
  147. wc_job.setOutputKeyClass(Text.class);
  148. wc_job.setOutputValueClass(IntWritable.class);
  149. for (int i = 0; i < pathArgs.length - 1; ++i)
  150. {
  151. FileInputFormat.addInputPath(wc_job, new Path(pathArgs[i]));
  152. }
  153. FileOutputFormat.setOutputPath(wc_job, wordcount_dir);
  154. wc_job.waitForCompletion(true);
  155. Job topn_job = Job.getInstance(conf, "TopN");
  156. topn_job.setJarByClass(TopNWordCount.class);
  157. topn_job.setMapperClass(TopNMapper.class);
  158. topn_job.setReducerClass(TopNReducer.class);
  159. topn_job.setMapOutputKeyClass(Text.class);
  160. topn_job.setMapOutputValueClass(IntWritable.class);
  161. topn_job.setOutputKeyClass(IntWritable.class);
  162. topn_job.setOutputValueClass(Text.class);
  163. FileInputFormat.addInputPath(topn_job, wordcount_dir);
  164. FileOutputFormat.setOutputPath(topn_job, output_dir);
  165. topn_job.waitForCompletion(true);
  166. }
  167. }

此程序的输出(使用此目录和文本文件作为输入)如下所示:

注意,这里的前10个单词是stopwords(比如 the , to 等等),正如我们所料。如果你想过滤掉那些停止词,你当然可以使用tf-idf并用hadoop实现它,比如下面的方法。

展开查看全部

相关问题