map-reduce:wordcount不做任何事情

b4qexyjb  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(363)

我想用mapreduce和hadoopv编写我自己的单词计数示例。1.0.3(我在macos上),但我不明白为什么共享我的代码不起作用:
主要内容:

package org.myorg;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {
    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        // set job name, mapper, combiner, and reducer classes
        conf.setJobName("WordCount");
        // set input and output formats
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        // set input and output paths
        //FileInputFormat. setInputPaths(conf, new Path(input));
        //FileOutputFormat.setOutputPath(conf, new Path(output));
        FileOutputFormat.setCompressOutput(conf, false);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(org.myorg.Map.class);
        conf.setReducerClass(org.myorg.Reduce.class);
        String host = args[0];
        String input = host + "/" + args[1];
        String output = host + "/" + args[2];
        // set input and output paths
        FileInputFormat.addInputPath(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));
        JobClient j=new JobClient(conf);
        (j.submitJob(conf)).waitForCompletion();
    }
}

Map器:

package org.myorg;

import java.io.IOException;
import java.util.HashMap;
import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.Vector;
import java.util.Map.Entry;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>  {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(LongWritable key, Text value,
    OutputCollector<Text, IntWritable> output, Reporter reporter)
    throws IOException {

        MapWritable hs = new MapWritable();
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            //hs.put(word, one);
            output.collect(word,one);
        }
        // TODO Auto-generated method stub
    }
}

减速器:

package org.myorg;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.math.RoundingMode;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.Vector;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Reducer.Context;

//public class Reduce extends MapReduceBase implements Reducer<Text, MapWritable, Text, Text> {
public class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, Text> {
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        String host = "hdfs://localhost:54310/";
        String tmp = host + "Temporany/output.txt";
        FileSystem srcFS;
        try {
            srcFS = FileSystem.get(new URI(tmp), new JobConf());
            srcFS.delete(new Path(tmp), true);
            BufferedWriter wr = new BufferedWriter(new OutputStreamWriter(
            srcFS.create(new Path(tmp))));
            wr.write(key.toString() + ":" + sum);
            wr.close();
            srcFS.close();
        } catch (URISyntaxException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        //   context.write(key, new IntWritable(sum));
    }
}

作业开始和结束时没有出现错误,但没有写入输出文件。我用hadoop启动jar,命令如下:

./Hadoop jar /Users/User/Desktop/hadoop/wordcount.jar hdfs://localhost:54310 /In/testo.txt /Out/wordcount17

这是输出:

2014-03-03 17:56:22.063 java[6365:1203] Unable to load realm info from SCDynamicStore
14/03/03 17:56:22 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/03/03 17:56:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/03/03 17:56:23 WARN snappy.LoadSnappy: Snappy native library not loaded
14/03/03 17:56:23 INFO mapred.FileInputFormat: Total input paths to process : 1

我想问题是“无法加载本机hadoop库”,但对于其他jar来说效果很好。

nuypyhwy

nuypyhwy1#

问:作业开始和结束时没有错误,但不写入输出文件??
答:我不确定作业是否成功结束,但没有错误。
问题:
作业配置:

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

setoutputkeyclass()和setoutputvalueclass()方法控制map和reduce函数的输出类型,它们通常是相同的。如果它们不同,则可以使用setmapoutputkeyclass()和setmapoutputvalueclass()方法设置Map输出类型。
本例中的输出类是:

Map key : Text
Map Value : IntWritable 
Reduce key : Text 
Reduce Value : Text

这将导致 Type mismatch exception 减少
我不知道你为什么要用HDFSAPI把你的输出写到一个文件里?。
应该使用 output.collect(key,value) .
如果是多个减速机,您是否处理同时写入操作?我想知道context.write在旧API中做了什么(它被注解了)。
您可以使用以下信息了解更多信息
调试map reduce作业:
计数器
jobtracker web界面
问。差异b/w submitjob()&&waitforcompletion()?
ans:submitjob():提交作业并结束。
waitforcompletion():提交作业并在控制台上打印作业的状态。所以waitforcompletion()是submitjob()+作业完成前的状态更新。
字数
请阅读
MapReduceapache
你也可以找到 hadoop-examples-X.X.X.jar 在安装文件夹中。
经过 $HADOOP_HOME/src/expalmes/ 源代码。

**$hadoop\u home=hadoop安装文件夹

相关问题