mapper输出键降序

qoefvg9y  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(496)

我是hadoop新手。到目前为止,我正在尝试实现一个定制的可写比较器来按降序对map输出键进行排序,它是doublewriteable的。下面是我的比较器类:

class DecreasingComparator extends WritableComparator{
        protected DecreasingComparator(){
            super(DoubleWritable.class,true);
        }
        @SuppressWarnings("rawtypes")
        @Override
        public int compare(WritableComparable w1,WritableComparable w2){
            DoubleWritable key1 = (DoubleWritable) w1;
            DoubleWritable key2 = (DoubleWritable) w2;
            return -1 * key1.compareTo(key2);
        }
    }

我得到的结果是:

在右边,它应该按键按降序排序。为什么它仍然是按值排序的,而值是不可写的?
Map器类不会有问题,因为它只输出键和值。我不知道该怎么办。mapper类如下所示:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SecondaryMapper extends Mapper<LongWritable,Text,DoubleWritable,IntWritable>{

    @Override
    public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
        String line = value.toString();
        DoubleWritable ratio = new DoubleWritable(Double.parseDouble(line.split("\\s")[0]));
        IntWritable id = new IntWritable(Integer.parseInt(line.split("\\s")[1]));
        context.write(ratio,id);
    }
}

下面是我的驾驶课:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

//second job import

public class CommentViewRatio {

    class DecreasingComparator extends WritableComparator{
        protected DecreasingComparator(){
            super(DoubleWritable.class,true);
        }
        @SuppressWarnings("rawtypes")
        @Override
        public int compare(WritableComparable w1,WritableComparable w2){
            DoubleWritable key1 = (DoubleWritable) w1;
            DoubleWritable key2 = (DoubleWritable) w2;
            return -1 * key1.compareTo(key2);
        }
    }

    public static void main(String[] args) throws Exception{
        if(args.length != 2){
            System.out.printf("Usage: WordCount <input dir> <output dir>\n");
            System.exit(-1);
        }
        Configuration conf1 = new Configuration();
        Job job = new Job(conf1);

        job.setJarByClass(CommentViewRatio.class);
        job.setJobName("Average");
        Path temp = new Path("temp");
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,  temp);

        job.setMapperClass(CommentViewMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(DoubleWritable.class);
//      job.setPartitionerClass(CommentViewPartitioner.class);
        job.setReducerClass(CommentViewReducer.class);
//      job.setNumReduceTasks(4);

        job.setOutputKeyClass(DoubleWritable.class);
        job.setOutputValueClass(LongWritable.class);

        boolean success = job.waitForCompletion(true);
        if(success) {
            Configuration conf2 = new Configuration();
            Job job2 = new Job(conf2);
            job2.setJarByClass(CommentViewRatio.class);

            FileInputFormat.addInputPath(job2, new Path("temp/part-r-00000"));
            FileOutputFormat.setOutputPath(job2, new Path(args[1]));

            job2.setMapperClass(SecondaryMapper.class);
            job2.setMapOutputKeyClass(DoubleWritable.class);
            job2.setMapOutputValueClass(IntWritable.class);
//          job2.setPartitionerClass(CommentViewPartitioner.class);
            job2.setSortComparatorClass(DecreasingComparator.class);
            job2.setNumReduceTasks(0);

            boolean success2 = job2.waitForCompletion(true);
            temp.getFileSystem(conf1).delete(temp);
            System.exit(success2 ? 0 : 1);
        }

    }
}

如有任何帮助,将不胜感激,谢谢阅读。

yi0zb3m4

yi0zb3m41#

显然,如果我想使用自定义排序比较器,我需要添加一个reducer类来对输出进行排序。
新驱动程序如下所示:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

//second job import

public class CommentViewRatio {

    public static class DecreasingComparator extends WritableComparator{
        protected DecreasingComparator(){
            super(DoubleWritable.class,true);
        }
        @SuppressWarnings("rawtypes")
        @Override
        public int compare(WritableComparable w1,WritableComparable w2){
            DoubleWritable key1 = (DoubleWritable) w1;
            DoubleWritable key2 = (DoubleWritable) w2;
            return -1 * key1.compareTo(key2);
        }
    }

    public static void main(String[] args) throws Exception{
        if(args.length != 2){
            System.out.printf("Usage: WordCount <input dir> <output dir>\n");
            System.exit(-1);
        }
        Configuration conf1 = new Configuration();
        Job job = new Job(conf1);

        job.setJarByClass(CommentViewRatio.class);
        job.setJobName("Average");
        Path temp = new Path("temp");
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,  temp);

        job.setMapperClass(CommentViewMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(DoubleWritable.class);
//      job.setPartitionerClass(CommentViewPartitioner.class);
        job.setReducerClass(CommentViewReducer.class);
//      job.setNumReduceTasks(4);

        job.setOutputKeyClass(DoubleWritable.class);
        job.setOutputValueClass(LongWritable.class);

        boolean success = job.waitForCompletion(true);
        if(success) {
            Configuration conf2 = new Configuration();
            Job job2 = new Job(conf2);
            job2.setJarByClass(CommentViewRatio.class);

            FileInputFormat.addInputPath(job2, new Path("temp/part-r-00000"));
            FileOutputFormat.setOutputPath(job2, new Path(args[1]));

            job2.setMapperClass(SecondaryMapper.class);
            job2.setMapOutputKeyClass(DoubleWritable.class);
            job2.setMapOutputValueClass(IntWritable.class);
          //job2.setPartitionerClass(CommentViewPartitioner.class);
            job2.setSortComparatorClass(DecreasingComparator.class);

            job2.setReducerClass(SecondaryReducer.class);
            job2.setOutputKeyClass(DoubleWritable.class);
            job2.setOutputValueClass(IntWritable.class);

          //job2.setNumReduceTasks(4);

            boolean success2 = job2.waitForCompletion(true);
            temp.getFileSystem(conf1).delete(temp);
            System.exit(success2 ? 0 : 1);
        }

    }
}

您所需要做的就是实现一个reducer类,以便对输出进行排序。奇怪的是,有些消息来源说writeablecomparator排序的是map输出,而不是reducer输出。

相关问题