在java中加入hadoop

nnsrf1az  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(274)

这个问题不太可能帮助任何未来的游客;它只与一个小的地理区域、一个特定的时刻或一个非常狭窄的情况有关,而这些情况通常不适用于互联网的全球受众。有关使此问题更广泛适用的帮助,请访问帮助中心。
8年前关门了。
我很快就开始使用hadoop,并尝试在java中实现一个join。不管是Map边还是缩小边。我选择reduce-side-join,因为它应该更容易实现。我知道java并不是连接、聚合等的最佳选择,最好选择我已经做过的hive或pig。不过,我正在做一个研究项目,为了进行比较,我必须使用这三种语言。
总之,我有两个不同结构的输入文件。一个是key | value,另一个是key | value 1;价值2;价值3;价值4。每个输入文件中的一条记录如下所示:
输入1: 1;2010-01-10T00:00:01 输入2: 1;23;Blue;2010-01-11T00:00:01;9999-12-31T23:59:59 我遵循了hadoop definitve指南中的示例,但它对我不起作用。我在这里发布我的java文件,这样你就可以看到哪里出了问题。

public class LookupReducer extends Reducer<TextPair,Text,Text,Text> {

private String result = "";
private String msisdn;
private String attribute, product;
private long trans_dt_long, start_dt_long, end_dt_long; 
private String trans_dt, start_dt, end_dt; 

@Override
public void reduce(TextPair key, Iterable<Text> values, Context context) 
        throws IOException, InterruptedException {

     context.progress();
    //value without key to remember

    Iterator<Text> iter =  values.iterator();

 for (Text val : values) {

Text recordNoKey = val;     //new Text(iter.next());

String valSplitted[] = recordNoKey.toString().split(";"); 

//if the input is coming from CDR set corresponding values

    if(key.getSecond().toString().equals(CDR.CDR_TAG))
    {
        trans_dt = recordNoKey.toString();
        trans_dt_long = dateToLong(recordNoKey.toString());
    }
  //if the input is coming from Attributes set corresponding values
    else if(key.getSecond().toString().equals(Attribute.ATT_TAG))
    {
        attribute = valSplitted[0];
        product = valSplitted[1];
        start_dt = valSplitted[2];
        start_dt_long = dateToLong(valSplitted[2]);
        end_dt = valSplitted[3];
        end_dt_long = dateToLong(valSplitted[3]);;
    }

        Text record = val;  //iter.next();
        //System.out.println("RECORD: " + record);
        Text outValue = new Text(recordNoKey.toString() + ";" + record.toString());     

if(start_dt_long < trans_dt_long && trans_dt_long < end_dt_long)
       {
    //concat output columns
        result = attribute + ";" + product + ";" + trans_dt;    

    context.write(key.getFirst(), new Text(result));
    System.out.println("KEY: " + key);
        }
    }
}

private static long dateToLong(String date){
    DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date parsedDate = null;
    try {
        parsedDate = formatter.parse(date);
    } catch (ParseException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    long dateInLong = parsedDate.getTime();

    return dateInLong;

}

public static class TextPair implements WritableComparable<TextPair> {

    private Text first;
    private Text second;

    public TextPair(){
        set(new Text(), new Text());
    }

    public TextPair(String first, String second){
        set(new Text(first), new Text(second));
    }

    public TextPair(Text first, Text second){
        set(first, second);
    }

    public void set(Text first, Text second){
        this.first = first;
        this.second = second;
    }

    public Text getFirst() {
        return first;
    }

    public void setFirst(Text first) {
        this.first = first;
    }

    public Text getSecond() {
        return second;
    }

    public void setSecond(Text second) {
        this.second = second;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        first.readFields(in);
        second.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        first.write(out);
        second.write(out);
    }

    @Override
    public int hashCode(){
        return first.hashCode() * 163 + second.hashCode();
    }

    @Override
    public boolean equals(Object o){
        if(o instanceof TextPair)
        {
            TextPair tp = (TextPair) o;
            return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
    }

    @Override
    public String toString(){
        return first + ";" + second;
    }

    @Override
    public int compareTo(TextPair tp) {
        // TODO Auto-generated method stub
        int cmp = first.compareTo(tp.first);
        if(cmp != 0)
            return cmp;
        return second.compareTo(tp.second);
    }

    public static class FirstComparator extends WritableComparator {

        protected FirstComparator(){
            super(TextPair.class, true);
        }

        @Override
        public int compare(WritableComparable comp1, WritableComparable comp2){
            TextPair pair1 = (TextPair) comp1;
            TextPair pair2 = (TextPair) comp2;
            int cmp = pair1.getFirst().compareTo(pair2.getFirst());

            if(cmp != 0)
                return cmp;

            return -pair1.getSecond().compareTo(pair2.getSecond());
        }
    }

    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() 
        {
            super(TextPair.class, true);
        }

        @Override
        public int compare(WritableComparable comp1, WritableComparable comp2)
        {
            TextPair pair1 =  (TextPair) comp1;
            TextPair pair2 =  (TextPair) comp2;

            return pair1.compareTo(pair2);
        }
    }

}

}
public class Joiner  extends Configured implements Tool {

public static final String DATA_SEPERATOR =";";                                      //Define the symbol for seperating the output data
public static final String NUMBER_OF_REDUCER = "1";                                  //Define the number of the used reducer jobs
public static final String COMPRESS_MAP_OUTPUT = "true";                             //if the output from the mapping process should be compressed, set COMPRESS_MAP_OUTPUT = "true" (if not set it to "false")
public static final String 
            USED_COMPRESSION_CODEC = "org.apache.hadoop.io.compress.SnappyCodec";    //set the used codec for the data compression
public static final boolean JOB_RUNNING_LOCAL = true;                                //if you run the Hadoop job on your local machine, you have to set JOB_RUNNING_LOCAL = true
                                                                                     //if you run the Hadoop job on the Telefonica Cloud, you have to set JOB_RUNNING_LOCAL = false
public static final String OUTPUT_PATH = "/home/hduser"; //set the folder, where the output is saved. Only needed, if JOB_RUNNING_LOCAL = false

public static class KeyPartitioner extends Partitioner<TextPair, Text> {
    @Override
    public int getPartition(/*[*/TextPair key/*]*/, Text value, int numPartitions) {
        System.out.println("numPartitions: " + numPartitions);
          return (/*[*/key.getFirst().hashCode()/*]*/ & Integer.MAX_VALUE) % numPartitions;
        }
}

private static Configuration hadoopconfig() {
    Configuration conf = new Configuration();

    conf.set("mapred.textoutputformat.separator", DATA_SEPERATOR);
    conf.set("mapred.compress.map.output", COMPRESS_MAP_OUTPUT);
    //conf.set("mapred.map.output.compression.codec", USED_COMPRESSION_CODEC);
    conf.set("mapred.reduce.tasks", NUMBER_OF_REDUCER);
    return conf;
}

@Override
public int run(String[] args) throws Exception {
    // TODO Auto-generated method stub
    if ((args.length != 3) && (JOB_RUNNING_LOCAL)) {

        System.err.println("Usage: Lookup <CDR-inputPath> <Attribute-inputPath> <outputPath>");
        System.exit(2);
    }

    //starting the Hadoop job
    Configuration conf = hadoopconfig();
    Job job = new Job(conf, "Join cdrs and attributes");
    job.setJarByClass(Joiner.class);

    MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CDRMapper.class);
    MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, AttributeMapper.class);
    //FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    //expecting a folder instead of a file

    if(JOB_RUNNING_LOCAL)
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
    else
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

    job.setPartitionerClass(KeyPartitioner.class);
    job.setGroupingComparatorClass(TextPair.FirstComparator.class);
    job.setReducerClass(LookupReducer.class);

    job.setMapOutputKeyClass(TextPair.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    return job.waitForCompletion(true) ? 0 : 1;
}

 public static void main(String[] args) throws Exception {

     int exitCode = ToolRunner.run(new Joiner(), args);
     System.exit(exitCode);

 }
}
public class Attribute {

public static final String ATT_TAG = "1";

public static class AttributeMapper 
extends Mapper<LongWritable, Text, TextPair, Text>{

    private static Text values = new Text();
    //private Object output = new Text();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //partition the input line by the separator semicolon   
        String[] attributes = value.toString().split(";");
        String valuesInString = "";

        if(attributes.length != 5)
            System.err.println("Input column number not correct. Expected 5, provided " + attributes.length
                    + "\n" + "Check the input file");
        if(attributes.length == 5)
        {
            //setting the values with the input values read above
            valuesInString = attributes[1] + ";" + attributes[2] + ";" + attributes[3] + ";" + attributes[4];
            values.set(valuesInString);
        //writing out the key and value pair
        context.write( new TextPair(new Text(String.valueOf(attributes[0])), new Text(ATT_TAG)), values);
            }
    }
}   

}

public class CDR    {

public static final String CDR_TAG = "0";

 public static class CDRMapper 
    extends Mapper<LongWritable, Text, TextPair, Text>{

        private static Text values = new Text();
        private Object output = new Text();

    @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //partition the input line by the separator semicolon   
    String[] cdr = value.toString().split(";");

    //setting the values with the input values read above
    values.set(cdr[1]);
    //output = CDR_TAG + cdr[1];

    //writing out the key and value pair
    context.write( new TextPair(new Text(String.valueOf(cdr[0])), new Text(CDR_TAG)), values);
        }

     }

}

我会很高兴,如果有人能至少张贴一个教程或一个简单的例子,这样一个连接功能的实现链接。我搜索了很多,但要么代码不完整,要么没有足够的解释。

dzjeubhm

dzjeubhm1#

老实说,我不知道你的代码要做什么,但那可能是因为我会用不同的方式来做,而且不熟悉你正在使用的api。
我将从头开始如下:
创建一个Map器来读取其中一个文件。对于读取的每一行,将一个键值对写入上下文。键是从键创建的文本,值是通过将“1”与整个输入记录连接起来而创建的另一个文本。
为另一个文件创建另一个Map器。此Map器的作用与第一个Map器类似,但其值是通过将“2”与整个输入记录连接起来而创建的文本。
写一个减速机来做连接。reduce()方法将获取为特定键编写的所有记录。您可以通过查看值是以“1”还是以“2”开头来判断哪个输入文件(以及记录的数据格式)。一旦您知道是否有一个、另一个或两个记录类型,您就可以编写任何需要的逻辑来合并一个或两个记录中的数据。
顺便说一下,您可以使用multipleinputs类在job/driver类中配置多个Map器。

相关问题