我目前正在尝试学习hadoop编程,并编写一个mapper中处理两个输入源的程序。这项工作与mapside连接问题类似。
因此,我首先使用了分布式缓存,但是它并没有很好地工作。因此,我第二次使用setup()函数。它在单台pc上的本地执行模式下运行良好,但是在集群环境下不起作用。
我不知道确切的原因。
如果我们使用setup()函数,集群是否有任何配置?
下面是我代码的一部分。这部分是体现迭代工作的作业驱动程序。
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
int iteration = 1;
Configuration conf = new Configuration();
Path in = new Path(arg0[0]);
Path out = new Path(arg0[1]+"iteration_"+iteration);
conf.set("conf.threshold", arg0[2]);
Job job = new Job(conf, "Test");
job.setJarByClass(getClass());
job.setMapperClass(FirstMap.class);
job.setReducerClass(FirstReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
job.waitForCompletion(true);
// start second job
// long counter = 4;//job.getCounters().findCounter(SecondReduce.Counter.CONVERGED).getValue();
String PriorPath = out.toString();
boolean Updates = true;
while (Updates) {
iteration ++;
conf = new Configuration();
Path out2 = new Path(arg0[1]+"iteration_"+iteration);
conf.set("prior.job.out", PriorPath);
conf.set("conf.iteration", iteration+"");
job = new Job(conf, "job"+iteration);
job.setJarByClass(getClass());
job.setMapperClass(SecondMap.class);
job.setReducerClass(SecondReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out2);
job.waitForCompletion(true);
PriorPath = out2.toString();
long counter = job.getCounters().findCounter(Counter.CONVERGED).getValue();
Updates = (counter > 0);
System.out.println("counter : " + counter);
}
return 0;
}
另外,包括设置功能的Map器如下所示。
public static class SecondMap extends
Mapper<LongWritable, Text, Text, IntWritable> {
IntWritable one = new IntWritable(1);
Vector<String> Vec = new Vector<String>();
Vector<String> Gen = new Vector<String>();
int iteration;
@Override
public void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
Path Cand = new Path(conf.get("prior.job.out"));
// iteration = Integer.parseInt(conf.get("conf.iteration"));
String iter = conf.get("conf.iteration");
iteration = Integer.parseInt(iter);
try {
FileSystem fs = FileSystem.get(conf);
FileStatus[] status = fs.listStatus(Cand);
for (int i = 0; i < status.length; i++) {
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(status[i].getPath())));
String line;
line = br.readLine();
while (line != null) {
System.out.println(line);
Vec.add(line);
line = br.readLine();
}
}
} catch (Exception e) {
System.out.println("File not found");
}
Gen = GenerateCandidate(Vec, iteration);
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// something with CandGen
}
}
}
有谁对这个问题有经验吗?
1条答案
按热度按时间thtygnil1#
它只被称为per mapper任务或reducer任务一次。因此,如果为一个作业生成了10个Map器或还原器,那么对于每个Map器和还原器,它将被调用一次。在这个方法中添加什么的一般准则是,任何需要做一次的任务都可以写在这里,例如获取分布式缓存的路径,向Map器和还原器传递和获取参数。清理方法也类似。