mapreduce自定义textoutputformat-奇怪字符nul、soh等

5uzkadbs  于 2021-06-03  发布在  Hadoop
关注(0)|答案(0)|浏览(343)

我实现了一个自定义输出格式,用于将键值对转换为json格式。

public class JSONOutputFormat extends TextOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();
    Path path = getOutputPath(context);
    FileSystem fs = path.getFileSystem(conf);
    FSDataOutputStream out = fs.create(new Path(path,context.getJobName()));
    return new JsonRecordWriter(out);
}

}

private static class JsonRecordWriter extends LineRecordWriter<Text,IntWritable>{
    boolean firstRecord = true;
    @Override
    public synchronized void close(TaskAttemptContext context) throws IOException {
        out.writeBytes("}");
        super.close(context);
    }

    @Override
    public synchronized void write(Text key, IntWritable value)
            throws IOException {
        if (!firstRecord){

            out.writeBytes(",\r\n");
            firstRecord = false;
        }
        out.writeUTF(key.toString() + ":" +value.toString());
    }

    public JsonRecordWriter(DataOutputStream out) throws IOException{
        super(out);
        out.writeBytes("{");
    }
}

但是,mapreduce作业的输出有一些不需要的字符,例如:{nulchair:12 nulbs book:1}
我的驾驶员等级如下:

public class Driver {

public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {

    IntWritable one = new IntWritable(1);
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split(" ");
        for(String word: words)
            context.write(new Text(word), one);
    }
}
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        Iterator it = values.iterator();

        int count = 0;
        while (it.hasNext()){
            IntWritable c = (IntWritable) it.next();
            count+=c.get();
        }
        context.write(key, new IntWritable(count));
    }
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration, "wordcountjson");
    job.setJarByClass(Driver.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);
    job.setOutputFormatClass(JSONOutputFormat.class);

    job.setNumReduceTasks(1);

    System.exit(job.waitForCompletion(true)?0:1);

}

}
你知道为什么这些字符会出现在输出中吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题