乘法输出零减速机

x6yk4ghg  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(381)

我的输入文件夹有200个文件。我希望多个输出将每个文件(使用“map.input.file”标识)中解析的输入写入具有相同名称的输出文件。因为我没有任何要执行的聚合,所以使用0 reducer选项(conf.setnumreducetasks(0))。理想情况下,我应该得到200个输出文件。
但是,我的输出有大约5000多个文件-每个文件只包含一行(流式输出)。显然,它不是聚合的。我的假设是,理想情况下,在零缩减器中-Map器输出应该聚合。
谢谢你的帮助。谢谢!

public static void main(String[] args) throws IOException {
if (args.length != 2) {
  System.err.println("Usage: MaxTemperature <input path> <output path>");
  System.exit(-1);
}    

JobConf conf = new JobConf(MultipleOutputEx.class);
conf.setJobName("Duration Count");

FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

conf.setNumReduceTasks(0);
conf.setMapperClass(MultipleOutputExMapper.class);
conf.setReducerClass(MultipleOutputExReducer.class);
conf.setMapOutputKeyClass(NullWritable.class);            
MultipleOutputs.addMultiNamedOutput(conf,"mofiles", TextOutputFormat.class, NullWritable.class, Text.class);    
JobClient.runJob(conf);

}
我的制图课是,

public class MultipleOutputExMapper extends MapReduceBase implements
    Mapper<LongWritable, Text, NullWritable, Text> {

MultipleOutputs mos = null;
Text fileKey = new Text();
String line = "";
private JobConf conf;

@Override
public void configure(JobConf conf) {
    this.conf = conf;
    mos = new MultipleOutputs(conf);
}

public void map(LongWritable key, Text value,
        OutputCollector<NullWritable, Text> output, Reporter reporter)
        throws IOException {
    try {
        String filename = conf.get("map.input.file");
        fileKey.set(filename);
        OutputCollector<NullWritable, Text> collector = mos.getCollector(
                "mofiles", key.toString(), reporter);
        collector.collect(NullWritable.get(), value);

    } catch (ArrayIndexOutOfBoundsException E) {
        E.printStackTrace();
    } catch (Exception E) {
        System.out.println(line);
        E.printStackTrace();
    }
}

@Override
public void close() throws IOException {
    mos.close();
}
mrfwxfqh

mrfwxfqh1#

您正在为每个唯一键创建一个输出文件(正如@climpage在其注解中所建议的那样)。尝试对此进行修改(未测试和未编译):

protected OutputCollector<NullWritable, Text> collector = null;
protected String filename = null;

@Override
public void configure(JobConf conf) {
    this.conf = conf;
    mos = new MultipleOutputs(conf);

    // get the filename (just the name, not the path)
    filename = new Path(conf.get("map.input.file")).getName();        
}

public void map(LongWritable key, Text value,
    OutputCollector<NullWritable, Text> output, Reporter reporter)
    throws IOException {

    try {
        if (collector == null) {
            // create an output collector for the file
            collector = mos.getCollector("mofiles", filename, reporter);
        }

        collector.collect(NullWritable.get(), value);
    } catch (ArrayIndexOutOfBoundsException E) {
        E.printStackTrace();
    } catch (Exception E) {
        System.out.println(line);
        E.printStackTrace();
    }
}

@Override
public void close() throws IOException {
    mos.close();
}

相关问题