multipleoutputformat在reduce步骤中只执行一次迭代

ckocjqey  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(293)

这是我的减速机。reducer接受edgewritable和nullwritable
edgewritable有4个整数,比如<71,74,7,2000>通信在71(fromid)到74(toid)之间,时间是2000年7月7日。
Map器将10787条记录输出到reducer,但reducer只输出1。
我需要输出44个文件,从1998年10月到2002年7月之间的44个月。输出格式应为“out”+月+年。例如,2002年7月的记录将在文件out72002中。
我已经调试了代码。在一次迭代之后,它输出一个文件并停止,而不记录下一条记录。请建议我应该如何使用多路输出。谢谢

public class MultipleOutputReducer extends Reducer<EdgeWritable, NullWritable, IntWritable, IntWritable>{
private MultipleOutputs<IntWritable,IntWritable> multipleOutputs;

protected void setup(Context context) throws IOException, InterruptedException{
    multipleOutputs = new MultipleOutputs<IntWritable, IntWritable>(context);

}

@override public void reduce(edgewriteable key,iterable val,context context)抛出ioexception,interruptedexception{int year=key.get(3).get();int month=key.get(2).get();int to=key.get(1).get();int from=key.get(0).get();

//if(year >= 1997 && year <= 2001){
        if((month >= 9 && year >= 1997) || (month <= 6 && year <= 2001)){

            multipleOutputs.write(new IntWritable(from), new IntWritable(to), "out"+month+year );
        }
    //}

}
@Override
public void cleanup(Context context) throws IOException, InterruptedException{
    multipleOutputs.close();
}

司机

public class TimeSlicingDriver extends Configured implements Tool{

static final SimpleDateFormat sdf = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss Z");
public int run(String[] args) throws Exception {

    if(args.length != 2){
        System.out.println("Enter <input path> <output path>");
        System.exit(-1);
    }

    Configuration setup = new Configuration();
    //setup.set("Input Path", args[0]);
    Job job = new Job(setup, "Time Slicing");
    //job.setJobName("Time Slicing");
    job.setJarByClass(TimeSlicingDriver.class);

    job.setMapperClass(TimeSlicingMapper.class);
    job.setReducerClass(MultipleOutputReducer.class);

    //MultipleOutputs.addNamedOutput(setup, "output", org.apache.hadoop.mapred.TextOutputFormat.class, EdgeWritable.class, NullWritable.class);

    job.setMapOutputKeyClass(EdgeWritable.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);

    /**Set the Input File Path and output file path*/
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    return job.waitForCompletion(true)?0:1;
}
y1aodyip

y1aodyip1#

您没有迭代迭代器“val”,因此代码中的逻辑对每个组执行一次。

相关问题