java—如何格式化hadoop中mapreduce编写的输出

au9on6nz  于 2021-06-02  发布在  Hadoop
关注(0)|答案(5)|浏览(413)

我正试着用每个单词来颠倒文件的内容。我有运行良好的程序,但我得到的输出是这样的东西

1   dwp
2   seviG
3   eht
4   tnerruc
5   gnikdrow
6   yrotcerid
7   ridkm
8   desU
9   ot
10  etaerc

我希望输出像这样

dwp seviG eht tnerruc gnikdrow yrotcerid ridkm desU
ot etaerc

我正在使用的代码

import java.io.IOException;
    import java.util.*;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.util.*;

    public class Reproduce {

    public static int temp =0;
    public static class ReproduceMap extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text>{
        private Text word = new Text();
        @Override
        public void map(LongWritable arg0, Text value,
                OutputCollector<IntWritable, Text> output, Reporter reporter)
                throws IOException {
            String line = value.toString().concat("\n");
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(new StringBuffer(tokenizer.nextToken()).reverse().toString());
                temp++;
                output.collect(new IntWritable(temp),word);
              }

        }

    }

    public static class ReproduceReduce extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text>{

        @Override
        public void reduce(IntWritable arg0, Iterator<Text> arg1,
                OutputCollector<IntWritable, Text> arg2, Reporter arg3)
                throws IOException {
            String word = arg1.next().toString();
            Text word1 = new Text();
            word1.set(word);
            arg2.collect(arg0, word1);

        }

    }

    public static void main(String[] args) throws Exception {
    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("wordcount");

    conf.setOutputKeyClass(IntWritable.class);
    conf.setOutputValueClass(Text.class);

    conf.setMapperClass(ReproduceMap.class);
    conf.setReducerClass(ReproduceReduce.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf);

  }
}

如何修改输出,而不是编写另一个java程序来实现这一点
提前谢谢

34gzjxbg

34gzjxbg1#

在mapper中,每个单词的键值都会递增,因此每个单词都作为一个单独的键值对进行处理。
下面的步骤应该可以解决问题1)在mapper中只需删除temp++,这样所有反转的单词的键都将是0(temp=0)。
2) reducer接收键0和反转字符串列表。在reducer中,将键设置为nullwriteable并写入输出。

bsxbgnwa

bsxbgnwa2#

你可以用 NullWritable 作为输出值。nullwritable只是一个占位符,因为您不希望数字显示为输出的一部分。我已经给你上课了。注意:-需要为nullwriteable添加import语句

public static class ReproduceReduce extends MapReduceBase implements Reducer<IntWritable, Text,  Text, NullWritable>{

            @Override
            public void reduce(IntWritable arg0, Iterator<Text> arg1,
                    OutputCollector<Text, NullWritable> arg2, Reporter arg3)
                    throws IOException {
                String word = arg1.next().toString();
                Text word1 = new Text();
                word1.set(word);
                arg2.collect(word1, new NullWritable());

            }

        }

并更改驱动程序类或主方法

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(NullWritable.class);
pxyaymoc

pxyaymoc3#

我们可以通过编写自定义fileoutputformat类来定制输出

rsaldnfx

rsaldnfx4#

您可以尝试使用一个常量键(或者简单地说是nullwriteable),并将其作为键传递,将整行作为值传递(您可以在mapper类中反转它,也可以在reducer类中反转它)。因此,您的reducer将收到一个常量键(或者占位符,如果您将nullwriteable用作键)和完整的行。现在您可以简单地反转该行并将其写入输出文件。通过不使用tmp作为键,可以避免在输出文件中写入不需要的数字。

qcuzuvrc

qcuzuvrc5#

下面是一个简单的代码演示如何使用自定义fileoutputformat

public class MyTextOutputFormat extends FileOutputFormat<Text, List<IntWritable>> {
      @Override
      public org.apache.hadoop.mapreduce.RecordWriter<Text, List<Intwritable>> getRecordWriter(TaskAttemptContext arg0) throws IOException, InterruptedException {
         //get the current path
         Path path = FileOutputFormat.getOutputPath(arg0);
         //create the full path with the output directory plus our filename
         Path fullPath = new Path(path, "result.txt");
     //create the file in the file system
     FileSystem fs = path.getFileSystem(arg0.getConfiguration());
     FSDataOutputStream fileOut = fs.create(fullPath, arg0);

     //create our record writer with the new file
     return new MyCustomRecordWriter(fileOut);
  }
}

public class MyCustomRecordWriter extends RecordWriter<Text, List<IntWritable>> {
    private DataOutputStream out;

    public MyCustomRecordWriter(DataOutputStream stream) {
        out = stream;
        try {
            out.writeBytes("results:\r\n");
        }
        catch (Exception ex) {
        }  
    }

    @Override
    public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
        //close our file
        out.close();
    }

    @Override
    public void write(Text arg0, List arg1) throws IOException, InterruptedException {
        //write out our key
        out.writeBytes(arg0.toString() + ": ");
        //loop through all values associated with our key and write them with commas between
        for (int i=0; i<arg1.size(); i++) {
            if (i>0)
                out.writeBytes(",");
            out.writeBytes(String.valueOf(arg1.get(i)));
        }
        out.writeBytes("\r\n");  
    }
}

最后,我们需要在运行作业之前告诉它输出格式和路径。

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ArrayList.class);
job.setOutputFormatClass(MyTextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/out"));

相关问题