我实现了一个自定义输出格式,用于将键值对转换为json格式。
public class JSONOutputFormat extends TextOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path path = getOutputPath(context);
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream out = fs.create(new Path(path,context.getJobName()));
return new JsonRecordWriter(out);
}
}
private static class JsonRecordWriter extends LineRecordWriter<Text,IntWritable>{
boolean firstRecord = true;
@Override
public synchronized void close(TaskAttemptContext context) throws IOException {
out.writeBytes("}");
super.close(context);
}
@Override
public synchronized void write(Text key, IntWritable value)
throws IOException {
if (!firstRecord){
out.writeBytes(",\r\n");
firstRecord = false;
}
out.writeUTF(key.toString() + ":" +value.toString());
}
public JsonRecordWriter(DataOutputStream out) throws IOException{
super(out);
out.writeBytes("{");
}
}
但是,mapreduce作业的输出有一些不需要的字符,例如:{nulchair:12 nulbs book:1}
我的驾驶员等级如下:
public class Driver {
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
IntWritable one = new IntWritable(1);
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for(String word: words)
context.write(new Text(word), one);
}
}
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Iterator it = values.iterator();
int count = 0;
while (it.hasNext()){
IntWritable c = (IntWritable) it.next();
count+=c.get();
}
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "wordcountjson");
job.setJarByClass(Driver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(JSONOutputFormat.class);
job.setNumReduceTasks(1);
System.exit(job.waitForCompletion(true)?0:1);
}
}
你知道为什么这些字符会出现在输出中吗?
暂无答案!
目前还没有任何答案,快来回答吧!