filealreadyexistsexception异常

oknwwptz  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(1270)

这个程序应该完成mapreduce的工作。第一个作业的输出必须作为第二个作业的输入。
当我运行它时,会出现两个错误:
线程“main”org.apache.hadoop.mapred.filealreadyexistsexception中出现异常
Map部分100%运行,但减速器未运行。
这是我的密码:

import java.io.IOException;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.LongWritable;

public class MaxPubYear {
    public static class FrequencyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Text word = new Text();
            String delim = ";";
            Integer year = 0;
            String tokens[] = value.toString().split(delim);
            if (tokens.length >= 4) {
                year = TryParseInt(tokens[3].replace("\"", "").trim());
                if (year > 0) {
                    word = new Text(year.toString());
                    context.write(word, new IntWritable(1));
                }
            }
        }
    }

    public static class FrequencyReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static class MaxPubYearMapper extends
            Mapper<LongWritable, Text, IntWritable, Text> {
        public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
            String delim = "\t";
            Text valtosend = new Text();
            String tokens[] = value.toString().split(delim);
            if (tokens.length == 2) {
                valtosend.set(tokens[0] + ";" + tokens[1]);
                context.write(new IntWritable(1), valtosend);
            }

        }
    }

    public static class MaxPubYearReducer extends
            Reducer<IntWritable, Text, Text, IntWritable> {

        public void reduce(IntWritable key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            int maxiValue = Integer.MIN_VALUE;
            String maxiYear = "";
            for (Text value : values) {
                String token[] = value.toString().split(";");
                if (token.length == 2
                        && TryParseInt(token[1]).intValue() > maxiValue) {
                    maxiValue = TryParseInt(token[1]);
                    maxiYear = token[0];
                }
            }
            context.write(new Text(maxiYear), new IntWritable(maxiValue));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Frequency");
        job.setJarByClass(MaxPubYear.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(FrequencyMapper.class);
        job.setCombinerClass(FrequencyReducer.class);
        job.setReducerClass(FrequencyReducer.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        job.setInputFormatClass(TextInputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1] + "_temp"));
        int exitCode = job.waitForCompletion(true) ? 0 : 1;

        if (exitCode == 0) {
            Job SecondJob = new Job(conf, "Maximum Publication year");
            SecondJob.setJarByClass(MaxPubYear.class);

            SecondJob.setOutputKeyClass(Text.class);
            SecondJob.setOutputValueClass(IntWritable.class);

            SecondJob.setMapOutputKeyClass(IntWritable.class);
            SecondJob.setMapOutputValueClass(Text.class);

            SecondJob.setMapperClass(MaxPubYearMapper.class);
            SecondJob.setReducerClass(MaxPubYearReducer.class);

            FileInputFormat.addInputPath(SecondJob, new Path(args[1] + "_temp"));
            FileOutputFormat.setOutputPath(SecondJob, new Path(args[1]));
            System.exit(SecondJob.waitForCompletion(true) ? 0 : 1);

        }
    }

    public static Integer TryParseInt(String trim) {
        // TODO Auto-generated method stub
        return(0);
    }
}
tpgth1q7

tpgth1q71#

线程“main”org.apache.hadoop.mapred.filealreadyexistsexception中出现异常
map reduce作业不会覆盖现有目录中的内容。mr作业的输出路径必须是不存在的目录路径。mr job将在指定路径上创建一个目录,其中包含文件。
在代码中:
fileoutputformat.setoutputpath(作业,新路径(args[1]+“temp”));
运行mr作业时,请确保此路径不存在。

相关问题