mapreduce reducer错误输出

e0bqpujr  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(281)

我有一个很大的tsv文件,有以下输入:

Site1 Tag1
Site1 Tag34
Site1 Tag8
Site2 Tag75
Site2 Tag54
Site2 Tag8
Site3 Tag24
Site3 Tag34
Site3 Tag1
...

我想在hadoopmapreduce的帮助下,找到输入中所有站点之间的相似站点对,以及每个站点对中相似标记的数量。
部分输入输出:

Site1 Site2 1  // Site1 is similar to Site2 with 1 tag (Tag8)
Site1 Site3 2  // Site1 is similar to Site3 with 2 tag (Tag1 and Tag34)
Site2 Site1 1
Site3 Site1 2

我想输出每个网站只有10个最相似的网站。
每个站点有3个标签
我想使用2个mapreduce作业:
Map标签(键)和站点并按标签缩小,在缩小阶段将所有站点作为特定标签并写入输出“tag sitex sitey”
第二个mapreduce作业将接受第一个输入,并将按sitex、sitey对执行分组,以获取相似站点对中相似标记的数量。
我试图实现第一个mapred,但我得到的只是“tag,site”输出。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RawToSimilarTagMapper {

    public static class TagToSiteMapper extends Mapper<Object, Text, Text, Text> {

        private Text    site    = new Text();
        private Text    tag     = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String [] siteTag = value.toString().split("\t");
            site.set(siteTag[0]);
            tag.set(siteTag[1]);

            context.write(tag, site);
            System.out.println();
        }
    }

    public static class SimilarSiteReducer extends Reducer<Text, Text, Text, Text> {
        private Text value = new Text();

        public void reduce(Text key, Iterable<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, InterruptedException {
            for (Text text : values) {
                for (Text text2 : values) {
                    if (!text.equals(text2)) {
                        value.set(text.toString() + "\t" + text2.toString());
                        output.collect(key, value);
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "raw-to-similar");
        job.setJarByClass(RawToSimilarTagMapper.class);
        job.setMapperClass(TagToSiteMapper.class);
        job.setCombinerClass(SimilarSiteReducer.class);
        job.setReducerClass(SimilarSiteReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        FileSystem fs = null;
        Path dstFilePath = new Path(args[2]);
        try {
            fs = dstFilePath.getFileSystem(conf);
            if (fs.exists(dstFilePath))
                fs.delete(dstFilePath, true);
        } catch (IOException e1) {
            e1.printStackTrace();
        }
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

我做错什么了?
在下一个阶段,我如何才能得到每个网站的前10个最相似的网站?

yc0p9oo0

yc0p9oo01#

我会这样做的。此外,您还可以通过在第二个作业的输出上编写第三个作业来进行排序,以获得前十个站点。(提示:您只需要编写Map器)注意:这适用于所提供的示例数据。您可能需要对格式错误的数据进行初始清理。
最终输出:

Site2   2
Site2   Site1   1
Site3   1
Site3   Site1   2

代码:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class TopSites{

    public static class TagToSiteMapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String [] siteTag = value.toString().split("\t");
            context.write(new Text(siteTag[1]), new Text(siteTag[0]));
            System.out.println(siteTag[1] + " --> " + siteTag[0]);
        }
    }

    public static class TagToSiteReducer extends Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String l =  "";
            System.out.print("Key: [" + key.toString() + "] Values: [");

            for (Text site : values)
                l += site + "\t";

            l=l.substring(0, l.length()-1);
            System.out.println(l + "]");
            context.write(new Text(key), new Text(l));
        }
    }
    public static class TopSiteMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            String [] data = value.toString().split("\t");
            String sites ="";
            System.out.println("map received: "+ value.toString());

            for(int i=1;i<data.length;i++)
                sites += data[i] + "\t";    

            System.out.println(sites.substring(0,sites.length()-1));
            context.write(new Text(sites.substring(0,sites.length()-1)), one);
        }
    }

    public static class TopSiteReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum =  0;
            System.out.print("Key: [" + key.toString() + "] Values: [");

            for (IntWritable site : values){
                System.out.print(site.get());
                sum+=site.get();
            }
            System.out.println("]");
            context.write(new Text(key), new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job  = Job.getInstance(conf, "site-to-tag");

        job.setJarByClass(TopSites.class);
        job.setMapperClass(TagToSiteMapper.class);
        job.setReducerClass(TagToSiteReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        MultipleInputs.addInputPath(job,  new Path(args[0]),TextInputFormat.class, TagToSiteMapper.class);

        Path outputpath = new Path(args[1]+"_temp");
        FileOutputFormat.setOutputPath(job,outputpath);

        FileSystem fs = null;
        Path dstFilePath = new Path(args[1]);
        try {
            fs = dstFilePath.getFileSystem(conf);
            if (fs.exists(dstFilePath))
                fs.delete(dstFilePath, true);

            dstFilePath = new Path(args[1]+"_temp");
            fs = dstFilePath.getFileSystem(conf);
            if (fs.exists(dstFilePath))
                fs.delete(dstFilePath, true);
        } catch (IOException e1) {
            e1.printStackTrace();
        }

        int code = job.waitForCompletion(true)?0:1;
        if(code == 0)
        {
            Job SecondJob = Job.getInstance(conf, "Tag-to-Sites");
            SecondJob.setJarByClass(TopSites.class);

            SecondJob.setOutputKeyClass(Text.class);
            SecondJob.setOutputValueClass(IntWritable.class);

            SecondJob.setMapperClass(TopSiteMapper.class);
            SecondJob.setCombinerClass(TopSiteReducer.class);
            SecondJob.setReducerClass(TopSiteReducer.class);

            FileInputFormat.addInputPath(SecondJob,new Path(args[1]+ "_temp"));
            FileOutputFormat.setOutputPath(SecondJob,new Path(args[1]));
            int exitCode = SecondJob.waitForCompletion(true)?0:1;
            FileSystem.get(conf).delete(new Path(args[1]+"_temp"), true);
            System.exit(exitCode);
        }
    }
}

控制台标准输出:

Tag1 --> Site1
Tag34 --> Site1
Tag8 --> Site1
Tag75 --> Site2
Tag54 --> Site2
Tag8 --> Site2
Tag24 --> Site3
Tag34 --> Site3
Tag1 --> Site3
Key: [Tag1] Values: [Site3  Site1]
Key: [Tag24] Values: [Site3]
Key: [Tag34] Values: [Site3 Site1]
Key: [Tag54] Values: [Site2]
Key: [Tag75] Values: [Site2]
Key: [Tag8] Values: [Site2  Site1]
map received: Tag1  Site3   Site1
Site3   Site1
map received: Tag24 Site3
Site3
map received: Tag34 Site3   Site1
Site3   Site1
map received: Tag54 Site2
Site2
map received: Tag75 Site2
Site2
map received: Tag8  Site2   Site1
Site2   Site1
Key: [Site2] Values: [11]
Key: [Site2 Site1] Values: [1]
Key: [Site3] Values: [1]
Key: [Site3 Site1] Values: [11]
Key: [Site2] Values: [2]
Key: [Site2 Site1] Values: [1]
Key: [Site3] Values: [1]
Key: [Site3 Site1] Values: [2]
wnrlj8wa

wnrlj8wa2#

看来你的合路器出了问题。mapper&combiner的输出格式必须相同,这在您的情况下是不正确的。您是否可以将combiner注解为仅用于性能优化,并运行相同的程序。

相关问题