如何删除合并器输出并在mapreduce final输出中只保留reducer输出

jdg4fx2g  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(402)

嗨,我正在运行一个应用程序,从hbase读取记录并写入文本文件。
我在应用程序中使用了组合器,也使用了自定义分区器。我在应用程序中使用了41 reducer,因为我需要创建40 reducer输出文件,以满足自定义分区器中的条件。
所有工作正常,但当我在我的应用程序中使用组合器,它创建每个区域或每个Map器的Map输出文件。
例如,我的应用程序中有40个区域,因此启动了40个Map程序,然后创建了40个Map输出文件。但是reducer不能合并所有的map输出并生成最终的reducer输出文件,该文件将是40个reducer输出文件。
文件中的数据是正确的,但没有任何文件增加。
你知道我怎么才能只得到减速机输出文件吗。

  1. import java.io.IOException;
  2. import org.apache.log4j.Logger;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Reducer;
  6. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
  7. public class CommonCombiner extends Reducer<NullWritable, Text, NullWritable, Text> {
  8. private Logger logger = Logger.getLogger(CommonCombiner.class);
  9. private MultipleOutputs<NullWritable, Text> multipleOutputs;
  10. String strName = "";
  11. private static final String DATA_SEPERATOR = "\\|\\!\\|";
  12. public void setup(Context context) {
  13. logger.info("Inside Combiner.");
  14. multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
  15. }
  16. @Override
  17. public void reduce(NullWritable Key, Iterable<Text> values, Context context)
  18. throws IOException, InterruptedException {
  19. for (Text value : values) {
  20. final String valueStr = value.toString();
  21. StringBuilder sb = new StringBuilder();
  22. if ("".equals(strName) && strName.length() == 0) {
  23. String[] strArrFileName = valueStr.split(DATA_SEPERATOR);
  24. String strFullFileName[] = strArrFileName[1].split("\\|\\^\\|");
  25. strName = strFullFileName[strFullFileName.length - 1];
  26. String strArrvalueStr[] = valueStr.split(DATA_SEPERATOR);
  27. if (!strArrvalueStr[0].contains(HbaseBulkLoadMapperConstants.FF_ACTION)) {
  28. sb.append(strArrvalueStr[0] + "|!|");
  29. }
  30. multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);
  31. context.getCounter(Counters.FILE_DATA_COUNTER).increment(1);
  32. }
  33. }
  34. }
  35. public void cleanup(Context context) throws IOException, InterruptedException {
  36. multipleOutputs.close();
  37. }
  38. }
cig3rfwq

cig3rfwq1#

让我们把基本问题弄清楚
合并器是一种优化,既可以在Map器上运行,也可以在reduce(reduce的合并阶段)(fetch-merge-reduce阶段)中运行。
找出密钥在数据中的分布情况,给定的Map器是否访问同一批密钥如果是的话,那么combiner就是在帮助别人它没有效果。
1k个区域没有保证它们是平等划分的。你有一些很热的地方
找到热点区域并分开。
请注意:http://bytepadding.com/big-data/map-reduce/understanding-map-reduce-the-missing-guide/

xu3bshqb

xu3bshqb2#

您没有从合并器输出任何数据以供减速机使用。在您的组合器中,您正在使用: multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName); 这并不是你如何写出数据,以便在两个阶段之间使用,即从Map器或组合器到reduce阶段。您应该使用: context.write() 在需要多个文件的地方,多路输出只是一种将额外文件写入磁盘的方法。我从没见过它用在合路器上。

相关问题