mapreduce—在hadoop中使用计数器和toolrunner时,显示状态为define而不是运行的作业

cu6pst1q  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(249)

我正在尝试用map reduce进行迭代。我有 3 sequence 作业正在运行

static enum UpdateCounter {
        INCOMING_ATTR
    }

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    int res = ToolRunner.run(conf, new Driver(), args);
    System.exit(res);
}

@Override
public int run(String[] args) throws Exception {
while(counter >= 0){

      Configuration conf = getConf();
     /*
     * Job 1: 
     */
     Job job1 = new Job(conf, "");
     //other configuration
     job1.setMapperClass(ID3ClsLabelMapper.class);
     job1.setReducerClass(ID3ClsLabelReducer.class);
     Path in = new Path(args[0]);
     Path out1 = new Path(CL);
     if(counter == 0){
            FileInputFormat.addInputPath(job1, in);
     }
     else{
            FileInputFormat.addInputPath(job1, out5);   
     }
     FileInputFormat.addInputPath(job1, in);
     FileOutputFormat.setOutputPath(job1,out1);
     job1.waitForCompletion(true);
    /*
     * Job 2: 
     *  
     */
    Configuration conf2 = getConf();
    Job job2 = new Job(conf2, "");
    Path out2 = new Path(ANC);
    FileInputFormat.addInputPath(job2, in);
    FileOutputFormat.setOutputPath(job2,out2);
   job2.waitForCompletion(true);

   /*
     * Job3
    */
    Configuration conf3 = getConf();
    Job job3 = new Job(conf3, "");
    System.out.println("conf3");
    Path out5 = new Path(args[1]);
    if(fs.exists(out5)){
        fs.delete(out5, true);
    }
    FileInputFormat.addInputPath(job3,out2);
    FileOutputFormat.setOutputPath(job3,out5);
    job3.waitForCompletion(true);
    FileInputFormat.addInputPath(job3,new Path(args[0]));
    FileOutputFormat.setOutputPath(job3,out5);
    job3.waitForCompletion(true);
    counter = job3.getCounters().findCounter(UpdateCounter.INCOMING_ATTR).getValue();
  }
 return 0;

工作3减速机

public class ID3GSReducer extends Reducer<NullWritable, Text, NullWritable, Text>{
 public static final String UpdateCounter = null;
    NullWritable out = NullWritable.get();
    public void reduce(NullWritable key,Iterable<Text>  values ,Context context) throws IOException, InterruptedException{
    for(Text val : values){
        String v = val.toString();
        context.getCounter(UpdateCounter.INCOMING_ATTR).increment(1);
        context.write(out, new Text(v));
    }
   }
}

但是展示

14/06/12 10:12:30 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
14/06/12 10:12:30 INFO mapred.JobClient:     Total committed heap usage (bytes)=1238630400
conf3
Exception in thread "main" java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
    at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:116)
    at org.apache.hadoop.mapreduce.Job.getCounters(Job.java:491)

现在如何遍历上述作业?
整个3个作业应一直工作,直到传入的\u attr==0和job3的输出- args[1] 应该是 job 1 为了 second iteration 。为了做到这一点,我应该改变什么。
请建议。
我做错什么了吗。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题