hadoop安装方法Map器

jfgube3f  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(318)

我目前正在尝试学习hadoop编程,并编写一个mapper中处理两个输入源的程序。这项工作与mapside连接问题类似。
因此,我首先使用了分布式缓存,但是它并没有很好地工作。因此,我第二次使用setup()函数。它在单台pc上的本地执行模式下运行良好,但是在集群环境下不起作用。
我不知道确切的原因。
如果我们使用setup()函数,集群是否有任何配置?
下面是我代码的一部分。这部分是体现迭代工作的作业驱动程序。

public int run(String[] arg0) throws Exception {

    // TODO Auto-generated method stub

    int iteration = 1;

    Configuration conf = new Configuration();

    Path in = new Path(arg0[0]);
    Path out = new Path(arg0[1]+"iteration_"+iteration);

    conf.set("conf.threshold", arg0[2]);

    Job job = new Job(conf, "Test");
    job.setJarByClass(getClass());
    job.setMapperClass(FirstMap.class);
    job.setReducerClass(FirstReduce.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, in);
    FileOutputFormat.setOutputPath(job, out);
    job.waitForCompletion(true);

    // start second job

//  long counter = 4;//job.getCounters().findCounter(SecondReduce.Counter.CONVERGED).getValue();
    String PriorPath = out.toString();

    boolean Updates = true;
    while (Updates) {

        iteration ++;
        conf = new Configuration();

        Path out2 = new Path(arg0[1]+"iteration_"+iteration);

        conf.set("prior.job.out", PriorPath);
        conf.set("conf.iteration", iteration+"");
        job = new Job(conf, "job"+iteration);
        job.setJarByClass(getClass());
        job.setMapperClass(SecondMap.class);
        job.setReducerClass(SecondReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out2);
        job.waitForCompletion(true);

        PriorPath = out2.toString();

        long counter = job.getCounters().findCounter(Counter.CONVERGED).getValue();
        Updates = (counter > 0);
        System.out.println("counter : " + counter);
    }

    return 0;
}

另外,包括设置功能的Map器如下所示。

public static class SecondMap extends
        Mapper<LongWritable, Text, Text, IntWritable> {

    IntWritable one = new IntWritable(1);
    Vector<String> Vec = new Vector<String>();
    Vector<String> Gen = new Vector<String>();
    int iteration;
    @Override
    public void setup(Context context) throws IOException,
            InterruptedException {
        super.setup(context);
        Configuration conf = context.getConfiguration();
        Path Cand = new Path(conf.get("prior.job.out"));
    //  iteration = Integer.parseInt(conf.get("conf.iteration"));
        String iter = conf.get("conf.iteration");
        iteration = Integer.parseInt(iter);

        try {
            FileSystem fs = FileSystem.get(conf);
            FileStatus[] status = fs.listStatus(Cand);
            for (int i = 0; i < status.length; i++) {
                BufferedReader br = new BufferedReader(
                        new InputStreamReader(fs.open(status[i].getPath())));
                String line;
                line = br.readLine();
                while (line != null) {
                    System.out.println(line);

                    Vec.add(line);
                    line = br.readLine();
                }
            }
        } catch (Exception e) {
            System.out.println("File not found");
        }
        Gen = GenerateCandidate(Vec, iteration);

    }

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // something with CandGen

        }
    }
}

有谁对这个问题有经验吗?

thtygnil

thtygnil1#

它只被称为per mapper任务或reducer任务一次。因此,如果为一个作业生成了10个Map器或还原器,那么对于每个Map器和还原器,它将被调用一次。在这个方法中添加什么的一般准则是,任何需要做一次的任务都可以写在这里,例如获取分布式缓存的路径,向Map器和还原器传递和获取参数。清理方法也类似。

相关问题