读取文件内容时出错-mapreduce

ldfqzlk8  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(296)

读取作为参数传递给程序的文件的内容并在控制台中显示它是我的程序的一部分。当我读取文件的内容时,我得到了一个错误。当我省略setup()并运行我的程序时,它工作正常。但是我想显示文件的内容。下面是我的代码。

package search;

import java.io.BufferedReader;
   import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    import java.text.Format;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.StringTokenizer;

import org.apache.commons.httpclient.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class search {

    public static class SearchMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        // Map code goes here.
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void setup(Context context) throws IOException, InterruptedException{
            Configuration conf = context.getConfiguration();
            java.net.URI[] localPaths = context.getCacheFiles();
            BufferedReader reader = null;

            try {
                File file = new File(localPaths[0]);
                reader = new BufferedReader(new FileReader(file));

                String line;
                while ((line = reader.readLine()) != null) {
                    System.out.println(line);
                }

            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        } 

        public void map (LongWritable Key, Text value,Context context )throws IOException,InterruptedException{

            String txt= value.toString();

                word = context.getCurrentValue();
                context.getCurrentKey();
                word.set(txt);
                context.write(word, one);

    }
    }

    public static class SearchReducer extends
            Reducer<Text, IntWritable,  Text, IntWritable> {
        // Reduce code goes here.

         private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values,
             Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                   sum += val.get();
               }
               result.set(sum);
                context.write(key,result);
            }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: SearchCounter <in> <out>");
            System.exit(2);
        }

        Job job = Job.getInstance(new Configuration());
        job.addCacheFile(new Path("/Users/praveen/input/").toUri());
        job =new Job(conf);

        job.setJarByClass(search.class);
        job.setMapperClass(SearchMapper.class);
        job.setCombinerClass(SearchReducer.class);
        job.setReducerClass(SearchReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    }

我得到以下错误:

java.lang.Exception: java.lang.NullPointerException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:403)
Caused by: java.lang.NullPointerException
    at search.search$SearchMapper.setup(search.java:59)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:235)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)
2015-06-06 17:00:42,907 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1338)) - Job job_local1148982887_0001 running in uber mode : false
2015-06-06 17:00:42,909 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1345)) -  map 0% reduce 0%
2015-06-06 17:00:42,912 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1358)) - Job job_local1148982887_0001 failed with state FAILED due to: NA
2015-06-06 17:00:42,921 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1363)) - Counters: 0

请指出这里有什么问题。

6ss1mwsb

6ss1mwsb1#

为什么在将缓存文件添加到作业后重新创建该作业?尝试删除此行:

job =new Job(conf);

相关问题