Hadoop中的多输出格式

jk9hmnmh  于 2022-12-11  发布在  Hadoop
关注(0)|答案(4)|浏览(342)

我是Hadoop的新手。我正在试用Wordcount程序。
现在,为了测试多个输出文件,我使用了MultipleOutputFormat。这个链接帮助我做到了这一点。http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html
在我的驾驶课上

  1. MultipleOutputs.addNamedOutput(conf, "even",
  2. org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
  3. IntWritable.class);
  4. MultipleOutputs.addNamedOutput(conf, "odd",
  5. org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
  6. IntWritable.class);`

而我被精简班级变成了这个

  1. public static class Reduce extends MapReduceBase implements
  2. Reducer<Text, IntWritable, Text, IntWritable> {
  3. MultipleOutputs mos = null;
  4. public void configure(JobConf job) {
  5. mos = new MultipleOutputs(job);
  6. }
  7. public void reduce(Text key, Iterator<IntWritable> values,
  8. OutputCollector<Text, IntWritable> output, Reporter reporter)
  9. throws IOException {
  10. int sum = 0;
  11. while (values.hasNext()) {
  12. sum += values.next().get();
  13. }
  14. if (sum % 2 == 0) {
  15. mos.getCollector("even", reporter).collect(key, new IntWritable(sum));
  16. }else {
  17. mos.getCollector("odd", reporter).collect(key, new IntWritable(sum));
  18. }
  19. //output.collect(key, new IntWritable(sum));
  20. }
  21. @Override
  22. public void close() throws IOException {
  23. // TODO Auto-generated method stub
  24. mos.close();
  25. }
  26. }

事情的工作,但我得到很多文件,(一个奇数和一个偶数为每个Map减少)
问题是:我怎么能只有2个输出文件(奇数和偶数),使每一个奇数输出的每一个Map减少得到写入到奇数文件,并为偶数相同。

czq61nw1

czq61nw11#

每个Reducer都使用OutputFormat来写入记录。这就是为什么每个Reducer都有一组奇数和偶数文件的原因。这是设计好的,这样每个Reducer都可以并行执行写入操作。
如果只需要一个奇数和一个偶数文件,则需要将mapred.reduce.tasks设置为1。但是性能会受到影响,因为所有Map器都将输入到一个reducer中。
另一个选择是更改读取这些文件的进程以接受多个输入文件,或者编写一个单独的进程将这些文件合并在一起。

dvtswwa3

dvtswwa32#

我为此写了一个类。用它来做你的工作:

  1. job.setOutputFormatClass(m_customOutputFormatClass);

这是我的课:

  1. import java.io.IOException;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.Map.Entry;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.mapreduce.RecordWriter;
  7. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  8. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  9. /**
  10. * TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.<br>
  11. * <p>
  12. * <b>WARNING</b>: The number of different folder shuoldn't be large for one mapper since we keep an
  13. * {@link RecordWriter} instance per folder name.
  14. * </p>
  15. * <p>
  16. * In this class the folder name is defined by the written entry's key.<br>
  17. * To change this behavior simply extend this class and override the
  18. * {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own
  19. * {@link FolderNameExtractor} implementation.
  20. * </p>
  21. *
  22. *
  23. * @author ykesten
  24. *
  25. * @param <K> - Keys type
  26. * @param <V> - Values type
  27. */
  28. public class HdMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> {
  29. private String folderName;
  30. private class MultipleFilesRecordWriter extends RecordWriter<K, V> {
  31. private Map<String, RecordWriter<K, V>> fileNameToWriter;
  32. private FolderNameExtractor<K, V> fileNameExtractor;
  33. private TaskAttemptContext job;
  34. public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) {
  35. fileNameToWriter = new HashMap<String, RecordWriter<K, V>>();
  36. this.fileNameExtractor = fileNameExtractor;
  37. this.job = job;
  38. }
  39. @Override
  40. public void write(K key, V value) throws IOException, InterruptedException {
  41. String fileName = fileNameExtractor.extractFolderName(key, value);
  42. RecordWriter<K, V> writer = fileNameToWriter.get(fileName);
  43. if (writer == null) {
  44. writer = createNewWriter(fileName, fileNameToWriter, job);
  45. if (writer == null) {
  46. throw new IOException("Unable to create writer for path: " + fileName);
  47. }
  48. }
  49. writer.write(key, value);
  50. }
  51. @Override
  52. public void close(TaskAttemptContext context) throws IOException, InterruptedException {
  53. for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) {
  54. entry.getValue().close(context);
  55. }
  56. }
  57. }
  58. private synchronized RecordWriter<K, V> createNewWriter(String folderName,
  59. Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) {
  60. try {
  61. this.folderName = folderName;
  62. RecordWriter<K, V> writer = super.getRecordWriter(job);
  63. this.folderName = null;
  64. fileNameToWriter.put(folderName, writer);
  65. return writer;
  66. } catch (Exception e) {
  67. e.printStackTrace();
  68. return null;
  69. }
  70. }
  71. @Override
  72. public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
  73. Path path = super.getDefaultWorkFile(context, extension);
  74. if (folderName != null) {
  75. String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName();
  76. path = new Path(newPath);
  77. }
  78. return path;
  79. }
  80. @Override
  81. public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
  82. return new MultipleFilesRecordWriter(getFolderNameExtractor(), job);
  83. }
  84. public FolderNameExtractor<K, V> getFolderNameExtractor() {
  85. return new KeyFolderNameExtractor<K, V>();
  86. }
  87. public interface FolderNameExtractor<K, V> {
  88. public String extractFolderName(K key, V value);
  89. }
  90. private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> {
  91. public String extractFolderName(K key, V value) {
  92. return key.toString();
  93. }
  94. }
  95. }
展开查看全部
raogr8fs

raogr8fs3#

将根据减速器的数量生成多个输出文件。
您可以使用hadoop dfs -getmerge来合并输出。

olqngx59

olqngx594#

您可以尝试更改输出文件名(Reducer输出),因为HDFS仅支持附加操作,那么它将从所有Reducer中收集所有Temp-r-0000 x文件(分区),并将它们放在一个文件中。
这里是您需要创建的类,它将覆盖TextOutputFormat中的方法:

  1. import java.io.IOException;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.Map.Entry;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.mapreduce.RecordWriter;
  7. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  8. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  9. public class CustomNameMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> {
  10. private String folderName;
  11. private class MultipleFilesRecordWriter extends RecordWriter<K, V> {
  12. private Map<String, RecordWriter<K, V>> fileNameToWriter;
  13. private FolderNameExtractor<K, V> fileNameExtractor;
  14. private TaskAttemptContext job;
  15. public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) {
  16. fileNameToWriter = new HashMap<String, RecordWriter<K, V>>();
  17. this.fileNameExtractor = fileNameExtractor;
  18. this.job = job;
  19. }
  20. @Override
  21. public void write(K key, V value) throws IOException, InterruptedException {
  22. String fileName = "**[FOLDER_NAME_INCLUDING_SUB_DIRS]**";//fileNameExtractor.extractFolderName(key, value);
  23. RecordWriter<K, V> writer = fileNameToWriter.get(fileName);
  24. if (writer == null) {
  25. writer = createNewWriter(fileName, fileNameToWriter, job);
  26. if (writer == null) {
  27. throw new IOException("Unable to create writer for path: " + fileName);
  28. }
  29. }
  30. writer.write(key, value);
  31. }
  32. @Override
  33. public void close(TaskAttemptContext context) throws IOException, InterruptedException {
  34. for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) {
  35. entry.getValue().close(context);
  36. }
  37. }
  38. }
  39. private synchronized RecordWriter<K, V> createNewWriter(String folderName,
  40. Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) {
  41. try {
  42. this.folderName = folderName;
  43. RecordWriter<K, V> writer = super.getRecordWriter(job);
  44. this.folderName = null;
  45. fileNameToWriter.put(folderName, writer);
  46. return writer;
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. return null;
  50. }
  51. }
  52. @Override
  53. public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
  54. Path path = super.getDefaultWorkFile(context, extension);
  55. if (folderName != null) {
  56. String newPath = path.getParent().toString() + "/" + folderName + "/**[ONE_FILE_NAME]**";
  57. path = new Path(newPath);
  58. }
  59. return path;
  60. }
  61. @Override
  62. public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
  63. return new MultipleFilesRecordWriter(getFolderNameExtractor(), job);
  64. }
  65. public FolderNameExtractor<K, V> getFolderNameExtractor() {
  66. return new KeyFolderNameExtractor<K, V>();
  67. }
  68. public interface FolderNameExtractor<K, V> {
  69. public String extractFolderName(K key, V value);
  70. }
  71. private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> {
  72. public String extractFolderName(K key, V value) {
  73. return key.toString();
  74. }
  75. }
  76. }

然后选择还原器/Map器:

  1. public static class ExtraLabReducer extends Reducer<CustomKeyComparable, Text, CustomKeyComparable, Text>
  2. {
  3. MultipleOutputs multipleOutputs;
  4. @Override
  5. protected void setup(Context context) throws IOException, InterruptedException {
  6. multipleOutputs = new MultipleOutputs(context);
  7. }
  8. @Override
  9. public void reduce(CustomKeyComparable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
  10. {
  11. for(Text d : values)
  12. {
  13. **multipleOutputs.write**("batta",key, d,**"[EXAMPLE_FILE_NAME]"**);
  14. }
  15. }
  16. @Override
  17. protected void cleanup(Context context) throws IOException, InterruptedException {
  18. multipleOutputs.close();
  19. }
  20. }

然后在“作业配置:

  1. Job job = new Job(getConf(), "ExtraLab");
  2. job.setJarByClass(ExtraLab.class);
  3. job.setMapperClass(ExtraLabMapper.class);
  4. job.setReducerClass(ExtraLabReducer.class);
  5. job.setOutputKeyClass(Text.class);
  6. job.setOutputValueClass(DoubleWritable.class);
  7. job.setMapOutputKeyClass(CustomKeyComparable.class);
  8. job.setMapOutputValueClass(Text.class);
  9. job.setInputFormatClass(TextInputFormat.class);
  10. //job.setOutputFormatClass(TextOutputFormat.class);
  11. FileInputFormat.addInputPath(job, new Path(args[0]));
  12. //adding one more reducer
  13. job.setNumReduceTasks(2);
  14. LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
  15. MultipleOutputs.addNamedOutput(job,"batta", CustomNameMultipleFileOutputFormat.class,CustomKeyComparable.class,Text.class);
展开查看全部

相关问题