reducer无法按键为不同的Map器分组

1u4esq0p  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(540)

用例:
文件1包含包含trackerid+其他字段的impression数据
文件2包含click details包含trackerid+clicked
我使用上述两个和一个减速器不同的Map器,但似乎减速器是不能结合这两个文件和数据。

package com.hadoop.intellipaat;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.google.common.collect.Lists;

/**
 * This job will combine click and impression on TrackerId
 * 
 * @author raghunandangupta
 *
 */

public class JoinClickImpressionDetailJob {

    public static final String IMPRESSION_PREFIX = "IMPRESSION_PREFIX";
    public static final String CLICK_PREFIX = "CLICK_PREFIX";
    public static final String SEPERATOR = "~";

    private static class ImpressionMapper extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            /**
             * Excluding header
             */
            if (!(value.toString().indexOf("accountId") != -1)) {
                String words[] = value.toString().split(",");
                if (words.length > 18) {
                    context.write(new Text(words[18].trim()), new Text(IMPRESSION_PREFIX + SEPERATOR + value.toString()));
                }
            } else {
                context.write(new Text(""), value);
            }
        }
    }

    private static class ClickMapper extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String words[] = value.toString().split(",");
            if (words.length > 18) {
                context.write(new Text(words[18].trim()), new Text(CLICK_PREFIX + SEPERATOR + value.toString()));
            } else {
                context.write(new Text(""), new Text("1"));
            }
        }
    }

    private static class ImpressionClickReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) {
            try {
                System.out.println("=========="+key.toString());
                if (key.toString().length() != 0) {
                    List<Text> myList = Lists.newArrayList(values);

                    for(Text t : myList){
                        System.out.println("#######"+t.toString());
                    }
                    System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@");
                    if (myList.size() == 2) {
                        if (myList.get(0).toString().indexOf(IMPRESSION_PREFIX) != -1 && myList.get(1).toString().indexOf(CLICK_PREFIX) != -1) {
                            String line = myList.get(0).toString().split(SEPERATOR)[1] + ",1";
                            context.write(key, new Text(line));
                        } else if (myList.get(1).toString().indexOf(IMPRESSION_PREFIX) != -1
                                && myList.get(0).toString().indexOf(CLICK_PREFIX) != -1) {
                            String line = myList.get(1).toString().split(SEPERATOR)[1] + ",1";
                            context.write(key, new Text(line));
                        }
                    }
                }
            } catch (Exception exception) {
                exception.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            // conf.set("mapreduce.output.fileoutputformat.compress", "true");
            // conf.set("mapreduce.output.fileoutputformat.compress.codec",
            // "org.apache.hadoop.io.compress.GzipCodec");
            // conf.set("mapreduce.map.output.compress.codec",
            // "org.apache.hadoop.io.compress.SnappyCodec");
            // conf.set("mapreduce.output.fileoutputformat.compress.type",
            // "BLOCK");
            Job job = Job.getInstance(conf, "IMPRESSION_CLICK_COMBINE_JOB");

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);

            job.setReducerClass(ImpressionClickReducer.class);

            FileInputFormat.setInputDirRecursive(job, true);

            // FileInputFormat.addInputPath(job, new Path(args[0]));
            // job.setMapperClass(ImpressionMapper.class);

            /**
             * Here directory of impressions will be present
             */
            MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ImpressionMapper.class);
            /**
             * Here directory of clicks will be present
             */
            MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ClickMapper.class);

            FileOutputFormat.setOutputPath(job, new Path(args[2]));

            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

任何线索都将不胜感激。
如。
File 1 [trackerId1,record1] File2 [treackerId1, Clicked] 在我看来: trackerId,[record1,record1] 理想情况下应该是这样 trackerId ,[record1,clicked]

kgsdhlau

kgsdhlau1#

您的问题很可能与减速器中的这条线有关: List<Text> myList = Lists.newArrayList(values); 要记住的是 Iterable<Text> values 重复使用 Text 对象,它在迭代时提供给您。所以你可能要加两个 Text 对象,但它们指向同一对象。
如果你看看 Lists.newArrayList() 如果可以的话,它只是将对象添加到数组中,而不创建新的对象。
所以如果你要用 Text 每次向数组中添加值时都需要创建一个新对象的对象。这就是人们在这种情况下使用字符串的一个典型原因。如果要快速检查这是否是问题所在,请将此代码更改为:

List<Text> myList = new ArrayList<Text>();
for (Text v : values) {
    myList.add(new Text(v));
}

因此,您创建了一个新的 Text 每次。

相关问题