mapreduce—在hadoop中使用计数器和toolrunner时,显示状态为define而不是运行的作业

cu6pst1q  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(274)

我正在尝试用map reduce进行迭代。我有 3 sequence 作业正在运行

  1. static enum UpdateCounter {
  2. INCOMING_ATTR
  3. }
  4. public static void main(String[] args) throws Exception {
  5. Configuration conf = new Configuration();
  6. int res = ToolRunner.run(conf, new Driver(), args);
  7. System.exit(res);
  8. }
  9. @Override
  10. public int run(String[] args) throws Exception {
  11. while(counter >= 0){
  12. Configuration conf = getConf();
  13. /*
  14. * Job 1:
  15. */
  16. Job job1 = new Job(conf, "");
  17. //other configuration
  18. job1.setMapperClass(ID3ClsLabelMapper.class);
  19. job1.setReducerClass(ID3ClsLabelReducer.class);
  20. Path in = new Path(args[0]);
  21. Path out1 = new Path(CL);
  22. if(counter == 0){
  23. FileInputFormat.addInputPath(job1, in);
  24. }
  25. else{
  26. FileInputFormat.addInputPath(job1, out5);
  27. }
  28. FileInputFormat.addInputPath(job1, in);
  29. FileOutputFormat.setOutputPath(job1,out1);
  30. job1.waitForCompletion(true);
  31. /*
  32. * Job 2:
  33. *
  34. */
  35. Configuration conf2 = getConf();
  36. Job job2 = new Job(conf2, "");
  37. Path out2 = new Path(ANC);
  38. FileInputFormat.addInputPath(job2, in);
  39. FileOutputFormat.setOutputPath(job2,out2);
  40. job2.waitForCompletion(true);
  41. /*
  42. * Job3
  43. */
  44. Configuration conf3 = getConf();
  45. Job job3 = new Job(conf3, "");
  46. System.out.println("conf3");
  47. Path out5 = new Path(args[1]);
  48. if(fs.exists(out5)){
  49. fs.delete(out5, true);
  50. }
  51. FileInputFormat.addInputPath(job3,out2);
  52. FileOutputFormat.setOutputPath(job3,out5);
  53. job3.waitForCompletion(true);
  54. FileInputFormat.addInputPath(job3,new Path(args[0]));
  55. FileOutputFormat.setOutputPath(job3,out5);
  56. job3.waitForCompletion(true);
  57. counter = job3.getCounters().findCounter(UpdateCounter.INCOMING_ATTR).getValue();
  58. }
  59. return 0;

工作3减速机

  1. public class ID3GSReducer extends Reducer<NullWritable, Text, NullWritable, Text>{
  2. public static final String UpdateCounter = null;
  3. NullWritable out = NullWritable.get();
  4. public void reduce(NullWritable key,Iterable<Text> values ,Context context) throws IOException, InterruptedException{
  5. for(Text val : values){
  6. String v = val.toString();
  7. context.getCounter(UpdateCounter.INCOMING_ATTR).increment(1);
  8. context.write(out, new Text(v));
  9. }
  10. }
  11. }

但是展示

  1. 14/06/12 10:12:30 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
  2. 14/06/12 10:12:30 INFO mapred.JobClient: Total committed heap usage (bytes)=1238630400
  3. conf3
  4. Exception in thread "main" java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
  5. at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:116)
  6. at org.apache.hadoop.mapreduce.Job.getCounters(Job.java:491)

现在如何遍历上述作业?
整个3个作业应一直工作,直到传入的\u attr==0和job3的输出- args[1] 应该是 job 1 为了 second iteration 。为了做到这一点,我应该改变什么。
请建议。
我做错什么了吗。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题