hadoop链Map器

kd3sttzy  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(332)

这个问题在这里已经有答案了

hadoopmapreduce:在mapreduce作业中链接Map程序的驱动程序(4个答案)
去年关门了。
我对hadoop还比较陌生,正在尝试找出如何用chainmapper,chainreducer以编程方式链接作业(多个Map器,reducer)。我发现了一些局部的例子,但没有一个完整的和有效的。
我当前的测试代码是

public class ChainJobs extends Configured implements Tool {

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            output.collect(word, one);
        }
    }
}

public static class Map2 extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken().concat("Justatest"));
            output.collect(word, one);
        }
    }
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
    }
}

@Override
public int run(String[] args)  {

    Configuration conf = getConf();
    JobConf job = new JobConf(conf);

    job.setJobName("TestforChainJobs");
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    JobConf map1Conf = new JobConf(false);
    ChainMapper.addMapper(job, Map.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, map1Conf);

    JobConf map2Conf = new JobConf(false);
    ChainMapper.addMapper(job, Map2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, map2Conf);

    JobConf reduceConf = new JobConf(false);
    ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

    JobClient.runJob(job);
    return 0;

     }

}

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new ChainJobs(), args);
    System.exit(res);
}

但它失败了

MapAttempt TASK_TYPE="MAP" TASKID="task_201210162337_0009_m_000000" TASK_ATTEMPT_ID="attempt_201210162337_0009_m_000000_0" TASK_STATUS="FAILED" FINISH_TIME="1350397216365" HOSTNAME="localhost\.localdomain" ERROR="java\.lang\.RuntimeException: Error in configuring object
    at org\.apache\.hadoop\.util\.ReflectionUtils\.setJobConf(ReflectionUtils\.java:106)
    at org\.apache\.hadoop\.util\.ReflectionUtils\.setConf(ReflectionUtils\.java:72)
    at org\.apache\.hadoop\.util\.ReflectionUtils\.newInstance(ReflectionUtils\.java:130)
    at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:389)
    at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:327)
    at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:268)
    at java\.security\.AccessController\.doPrivileged(Native Method)
    at javax\.security\.auth\.Subject\.doAs(Subject\.java:396)

任何提示或一个非常简单的工作示例非常感谢。

slsn1g29

slsn1g291#

我已经编码了一个基于链Map器的wordcount作业。代码已经在新的api上编写,并且工作良好:)

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//implementing CHAIN MAPREDUCE without using custom format

//SPLIT MAPPER
class SplitMapper extends Mapper<Object,Text,Text,IntWritable>
{
    private IntWritable dummyValue=new IntWritable(1);
    //private String content;
    private String tokens[];
    @Override
    public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
        tokens=value.toString().split(" ");
        for(String x:tokens)
        {
        context.write(new Text(x), dummyValue);
        }
    }   
}

//UPPER CASE MAPPER
class UpperCaseMapper extends Mapper<Text,IntWritable,Text,IntWritable>
{
    @Override
    public void map(Text key,IntWritable value,Context context)throws IOException,InterruptedException{
        String val=key.toString().toUpperCase();
        Text newKey=new Text(val);
        context.write(newKey, value);
    }
}

//ChainMapReducer
class ChainMapReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
    private int sum=0;
    @Override
    public void reduce(Text key,Iterable<IntWritable>values,Context context)throws IOException,InterruptedException{
        for(IntWritable value:values)
        {
            sum+=value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}
public class FirstClass extends Configured implements Tool{
    static Configuration cf;
    public int run (String args[])throws IOException,InterruptedException,ClassNotFoundException{
        cf=new Configuration();

        //bypassing the GenericOptionsParser part and directly running into job declaration part
        Job j=Job.getInstance(cf);

        /**************CHAIN MAPPER AREA STARTS********************************/
        Configuration splitMapConfig=new Configuration(false);
        //below we add the 1st mapper class under ChainMapper Class
        ChainMapper.addMapper(j, SplitMapper.class, Object.class, Text.class, Text.class, IntWritable.class, splitMapConfig);

        //configuration for second mapper
        Configuration upperCaseConfig=new Configuration(false);
        //below we add the 2nd mapper that is the lower case mapper to the Chain Mapper class
        ChainMapper.addMapper(j, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, upperCaseConfig);
        /**************CHAIN MAPPER AREA FINISHES********************************/

        //now proceeding with the normal delivery
        j.setJarByClass(FirstClass.class);
        j.setCombinerClass(ChainMapReducer.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);
        Path p=new Path(args[1]);

        //set the input and output URI
        FileInputFormat.addInputPath(j, new Path(args[0]));
        FileOutputFormat.setOutputPath(j, p);
        p.getFileSystem(cf).delete(p, true);
        return j.waitForCompletion(true)?0:1;
    }
    public static void main(String args[])throws Exception{
        int res=ToolRunner.run(cf, new FirstClass(), args);
        System.exit(res);
    }
}

输出部分如下所示

A       619
ACCORDING       636
ACCOUNT 638
ACROSS? 655
ADDRESSES       657
AFTER   674
AGGREGATING,    687
AGO,    704
ALL     721
ALMOST  755
ALTERING        768
AMOUNT  785
AN      819
ANATOMY 820
AND     1198
ANXIETY 1215
ANY     1232
APACHE  1300
APPENDING       1313
APPLICATIONS    1330
APPLICATIONS.   1347
APPLICATIONS.�        1364
APPLIES 1381
ARCHITECTURE,   1387
ARCHIVES        1388
ARE     1405
AS      1422
BASED   1439

你可能会看到一些特殊或不需要的字符,因为我没有使用任何清理,以消除标点符号。我只是专注于链Map器的工作。谢谢:)

相关问题