我看过一些相关的帖子,但在我的情况下,所有的事情都很好,但当我运行我的Map缩小工作,缩小部分被跳过。我正在努力摆脱这个问题很多。任何人请找出问题,并在得到工作运行的帮助。
Map任务:
package org.netflix.rating;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class NetflixAvgRatingMap extends Mapper<LongWritable,Text,LongWritable,IntWritable>{
public void map(LongWritable key,Text value,Context context){
String NetflixEntrypattern="^(\\s*)([0-9]+)(\\s+)([0-9]+)(\\s+)(\\d{1})(\\s+)(.*)";
Pattern p = Pattern.compile(NetflixEntrypattern);
Matcher matcher = p.matcher(value.toString());
if (!matcher.matches()) {
return;
}
Long movie_id=Long.parseLong(matcher.group(4));
int rating=Integer.parseInt(matcher.group(6));
try {
context.write(new LongWritable(movie_id),new IntWritable(rating));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
减少任务:
package org.netflix.rating;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class NetflixAvgRatingReducer extends Reducer<LongWritable,IntWritable,LongWritable,FloatWritable> {
public void reduce(LongWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
int sum=0,count=0;
while(values.iterator().hasNext()){
sum+=values.iterator().next().get();
count++;
}
float avg=(float)sum/count;
context.write(key,new FloatWritable(avg));
}
}
驾驶员等级:
package org.netflix.rating;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
public class NetflixAvgRating {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// Create job
Job job = new Job(conf, "NetflixAvgRating");
job.setJarByClass(NetflixAvgRating.class);
// Setup MapReduce job
job.setMapperClass(NetflixAvgRatingMap.class);
job.setReducerClass(NetflixAvgRatingReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(FloatWritable.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(IntWritable.class);
// Input
FileInputFormat.addInputPath(job, new Path(args[0]));
//job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//job.setOutputFormatClass(TextOutputFormat.class);
// Execute job
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
}
我已经正确设置了所有配置和参数,但是我的reduce任务不再执行了。如有任何建议,将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!