我是hadoop新手。到目前为止,我正在尝试实现一个定制的可写比较器来按降序对map输出键进行排序,它是doublewriteable的。下面是我的比较器类:
class DecreasingComparator extends WritableComparator{
protected DecreasingComparator(){
super(DoubleWritable.class,true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1,WritableComparable w2){
DoubleWritable key1 = (DoubleWritable) w1;
DoubleWritable key2 = (DoubleWritable) w2;
return -1 * key1.compareTo(key2);
}
}
我得到的结果是:
在右边,它应该按键按降序排序。为什么它仍然是按值排序的,而值是不可写的?
Map器类不会有问题,因为它只输出键和值。我不知道该怎么办。mapper类如下所示:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SecondaryMapper extends Mapper<LongWritable,Text,DoubleWritable,IntWritable>{
@Override
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
DoubleWritable ratio = new DoubleWritable(Double.parseDouble(line.split("\\s")[0]));
IntWritable id = new IntWritable(Integer.parseInt(line.split("\\s")[1]));
context.write(ratio,id);
}
}
下面是我的驾驶课:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
//second job import
public class CommentViewRatio {
class DecreasingComparator extends WritableComparator{
protected DecreasingComparator(){
super(DoubleWritable.class,true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1,WritableComparable w2){
DoubleWritable key1 = (DoubleWritable) w1;
DoubleWritable key2 = (DoubleWritable) w2;
return -1 * key1.compareTo(key2);
}
}
public static void main(String[] args) throws Exception{
if(args.length != 2){
System.out.printf("Usage: WordCount <input dir> <output dir>\n");
System.exit(-1);
}
Configuration conf1 = new Configuration();
Job job = new Job(conf1);
job.setJarByClass(CommentViewRatio.class);
job.setJobName("Average");
Path temp = new Path("temp");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, temp);
job.setMapperClass(CommentViewMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(DoubleWritable.class);
// job.setPartitionerClass(CommentViewPartitioner.class);
job.setReducerClass(CommentViewReducer.class);
// job.setNumReduceTasks(4);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(LongWritable.class);
boolean success = job.waitForCompletion(true);
if(success) {
Configuration conf2 = new Configuration();
Job job2 = new Job(conf2);
job2.setJarByClass(CommentViewRatio.class);
FileInputFormat.addInputPath(job2, new Path("temp/part-r-00000"));
FileOutputFormat.setOutputPath(job2, new Path(args[1]));
job2.setMapperClass(SecondaryMapper.class);
job2.setMapOutputKeyClass(DoubleWritable.class);
job2.setMapOutputValueClass(IntWritable.class);
// job2.setPartitionerClass(CommentViewPartitioner.class);
job2.setSortComparatorClass(DecreasingComparator.class);
job2.setNumReduceTasks(0);
boolean success2 = job2.waitForCompletion(true);
temp.getFileSystem(conf1).delete(temp);
System.exit(success2 ? 0 : 1);
}
}
}
如有任何帮助,将不胜感激,谢谢阅读。
1条答案
按热度按时间yi0zb3m41#
显然,如果我想使用自定义排序比较器,我需要添加一个reducer类来对输出进行排序。
新驱动程序如下所示:
您所需要做的就是实现一个reducer类,以便对输出进行排序。奇怪的是,有些消息来源说writeablecomparator排序的是map输出,而不是reducer输出。