java hadoop mapreduce链接作业

6ss1mwsb  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(383)

我有一些正确选择源代码和最高权重的代码。我似乎不能把目标列也拉进来。有人能给我指出正确的方向吗?我以前从未使用过java。我认为reducer函数需要返回一个元组。因此,mapper函数中的变量目标是否需要这个元组?
所需输出:每行包含一个节点id,后跟一个tab(\t)和所需的“tgt,weight”元组。元组是权重最高的tgt。在平局的情况下,用最小的数字返回tgt。
输入

src        tgt        weight

1        110        3

1        200        1

20        150        30

10        110        10

11        130        15

11        200        67

1        70        3

预期产量

1        70,3

20        150,30

10        110,10

11        200,67

当前输出(需要作为tuple添加到tgt列中)

1        3

20        30

10        10

11        67
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Q1 {

  public static class TargetMapper extends Mapper<Object, Text, Text, IntWritable> {

      private Text target = new Text();
      public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
            StringTokenizer st = new StringTokenizer(value.toString(), "\r");
            while (st.hasMoreTokens()) {
                String[] edge = st.nextToken().split("\t");
                target.set(edge[0]);
                context.write(target, new IntWritable(Integer.parseInt(edge[2])));
            }
        }

    }

  public static class EmailsReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

      private IntWritable totalCount = new IntWritable();  
      public void reduce(Text key, Iterable<IntWritable> targets, Context context) throws IOException, InterruptedException{

            int max = 0;

            for (IntWritable target : targets)  {
                if(target.get() > max || max ==0) {
                    max = target.get();
                }
            }

            totalCount.set(max);

            context.write(key, totalCount);

        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Q1");

        job.setJarByClass(Q1.class);
        job.setMapperClass(TargetMapper.class);
        job.setCombinerClass(EmailsReducer.class);
        job.setReducerClass(EmailsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
ftf50wuq

ftf50wuq1#

您对自定义输出感兴趣。要实现这一点,请尝试实现自定义 WritableComparable . 你可能需要更新你的逻辑,使它根据你的需要工作。
比如:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

public class MyWritable implements WritableComparable<MyWritable> {
    private IntWritable tgt;
    private IntWritable weight;

    public MyWritable() {
        set(new IntWritable(), new IntWritable());
    }

    public MyWritable(int tgt, int weight) {
        set(new IntWritable(tgt), new IntWritable(weight));
    }

    public MyWritable(IntWritable tgt, IntWritable weight) {
        set(tgt, weight);
    }

    public IntWritable getTgt() {
        return tgt;
    }

    public IntWritable getWeight() {
        return weight;
    }

    public void set(IntWritable tgt, IntWritable weight) {
        this.tgt = tgt;
        this.weight = weight;
    }

    @Override
    public int compareTo(MyWritable o) {
        int cmp = tgt.compareTo(o.tgt);
        if (cmp == 0) {
            return weight.compareTo(o.weight);
        }
        return cmp;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        tgt.write(dataOutput);
        weight.write(dataOutput);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        tgt.readFields(dataInput);
        weight.readFields(dataInput);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        MyWritable that = (MyWritable) o;
        return Objects.equals(tgt, that.tgt) &&
                Objects.equals(weight, that.weight);
    }

    @Override
    public int hashCode() {
        return Objects.hash(tgt, weight);
    }
}

并更新代码,将其用作mapper&reducer中的值。比如:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

public class Q1 {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Q1");

        job.setJarByClass(Q1.class);
        job.setMapperClass(TargetMapper.class);
        job.setCombinerClass(EmailsReducer.class);
        job.setReducerClass(EmailsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(MyWritable.class);
        job.setMapOutputValueClass(MyWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class TargetMapper extends Mapper<Object, Text, Text, MyWritable> {
        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer st = new StringTokenizer(value.toString(), "\r");
            while (st.hasMoreTokens()) {
                String[] edge = st.nextToken().split("\t");
                Text target = new Text();
                target.set(edge[0]);
                int tgt = Integer.parseInt(edge[1]);
                int weight = Integer.parseInt(edge[2]);
                context.write(target, new MyWritable(tgt, weight));
            }
        }

    }

    public static class EmailsReducer extends Reducer<Text, MyWritable, Text, MyWritable> {
        private MyWritable res = new MyWritable();

        public void reduce(Text key, Iterable<MyWritable> targets, Context context) throws IOException, InterruptedException {
            int maxWeight = Integer.MIN_VALUE;
            int maxTgt = Integer.MIN_VALUE;

            for (MyWritable target : targets) {
                if (target.getWeight().get() > maxWeight) {
                    maxWeight = target.getWeight().get();
                    maxTgt = target.getTgt().get();
                }
            }

            res.set(new IntWritable(maxTgt), new IntWritable(maxWeight));

            context.write(key, res);
        }
    }
}

相关问题