使用hadoop计数器-多个作业

t2a7ltrp  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(453)

我正在使用hadoop进行一个mapreduce项目。我目前有3个连续的工作。
我想使用hadoop计数器,但问题是我想在第一个作业中进行实际计数,但在第三个作业的reducer中访问计数器值。
我怎样才能做到这一点?我应该在哪里定义 enum ? 我需要通过第二份工作吗?这也将有助于看到一些代码的例子这样做,因为我还没有找到任何东西。
注意:我使用的是hadoop2.7.2
编辑:我已经尝试了这里介绍的方法,但没有成功。我的情况不同,因为我想从不同的作业访问计数器(不是从mapper到reducer)。
我想做的:第一件事:

public static void startFirstJob(String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "wordCount");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(WordCountMapper.class);
    job.setCombinerClass(WordCountReducer.class);
    job.setReducerClass(WordCountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    job.waitForCompletion(true);
}

在其他类中定义了计数器枚举:

public class CountersClass {
    public static enum N_COUNTERS {
        SOMECOUNT
    }
}

正在尝试读取计数器:

Cluster cluster = new Cluster(context.getConfiguration());
Job job = cluster.getJob(JobID.forName("wordCount"));
Counters counters = job.getCounters();
CountersClass.N_COUNTERS mycounter = CountersClass.N_COUNTERS.valueOf("SOMECOUNT");
Counter c1 = counters.findCounter(mycounter);
long N_Count = c1.getValue();
cpjpxq1n

cpjpxq1n1#

经典的解决方案是将作业的计数器值放入需要访问它的后续作业的配置中:
因此,请确保在计数作业Map器/减速机中正确增加它:

context.getCounter(CountersClass.N_COUNTERS.SOMECOUNT).increment(1);

然后在计算作业完成时间后:

job.waitForCompletion(true);

Counter someCount = job.getCounters().findCounter(CountersClass.N_COUNTERS.SOMECOUNT);

//put counter value into conf object of the job where you need to access it
//you can choose any name for the conf key really (i just used counter enum name here)
job2.getConfiguration().setLong(CountersClass.N_COUNTERS.SOMECOUNT.name(), someCount.getValue());

下一步是在另一个作业的mapper/reducer中访问它。只需重写setup(),例如:

private long someCount;

@Override
protected void setup(Context context) throws IOException,
    InterruptedException {
  super.setup(context);
  this.someCount  = context.getConfiguration().getLong(CountersClass.N_COUNTERS.SOMECOUNT.name(), 0));
}
vbkedwbf

vbkedwbf2#

在第一个作业结束时获取计数器,并将其值写入文件,然后在后续作业中读取。如果要从reducer中读取它,请将其写入hdfs;如果要在应用程序代码中读取和初始化,请将其写入本地文件。 Counters counters = job.getCounters(); Counter c1 = counters.findCounter(COUNTER_NAME); System.out.println(c1.getDisplayName()+":"+c1.getValue()); 读写文件是基础教程的一部分。

相关问题