缺少map/combine/reduce逻辑,关于如何跟踪某个东西

xqk2d5yq  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(292)

我试着用map/reducer做不同的工作人员,而不是以前我做的。
我现在有一个文件输入如下:

1 50000 2015 pc technology 
2 15424 1998 mouse technology 
3 78420 2010 pen technology 
4 8452 2000 pen stationery
5 4125 2000 pen stationery

id、价格、年份、项目、类型
我要做的是计算一种特定商品的平均价格,每种商品的平均价格,以及每一年的平均价格。所以,举个例子,我开始为钢笔做这些东西。2000年钢笔的平均价格是多少?在我的示例中,有两种笔(pc的数字笔和标准笔),因此我希望有如下输出:

pen stationery 6288 2000
pen technology 78420 2010

我的问题是我不知道怎么做。。。我知道如何使用合并器/减速机计算平均值,但我不知道如何记录年份和项目。。。我的逻辑是这样的:检查物品是否是笔,然后记录年份和笔的价格,用这些数据计算平均值。但我不知道怎么做。任何帮助都非常感谢,谢谢

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Draft2 {

     public static class TokenizerMapper extends Mapper<Object, Text, Text, Text>{

     private Text word = new Text(); 
     private Text word2 = new Text(); 

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

         String[] tokens = value.toString().split(",");

         String price = "";
         String item = "";
         String typ = "";
         String year = "";

         if(tokens.length >= 2)
             price = tokens[1];

         if(tokens.length >= 3)
             year = tokens[2];

         if (tokens.length >=5)
             typ = tokens[4];

         if (tokens.length >=14)
             item = tokens[13];

         if(!item.isEmpty())
         {
             word.set(item + "|" + year);
             word2.set(price + "|" + typ);
             context.write(word, word2);
         }
    }
  }

  public static class MeanCombiner extends Reducer <Text,Text,Text,Text> {
      private Text word = new Text();
      private Text word2 = new Text();

      public void combine(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
          int sum = 0;
          int counter = 0;

          final Iterator<Text> itr = values.iterator();
          String[] strs = key.toString().split("|");
          final String[] tokens = values.toString().split("|");

          String item = strs[0];
          String typ = tokens[1];
          String type = "pen";
          String year = strs[1];
          Boolean found = false;

       for (Text val: values) {
          String[] strs = key.toString().split("|");
          final String[] tokens = values.toString().split("|");                
          String item= strs[0];
          String typ = tokens[1];
          String type = "pen";
          String year = strs[1];

          if (typ.equals(type)) {
              found = true;
          //don't know how to go on       

          }

          /*this part is for the average but with wrong data*/
          while (itr.hasNext()) {
                if (typ.equals(type)) {
                    final String price = tokens[0];
                    final int value = Integer.parseInt(price);
                    counter++;
                    sum += value;
                  }
          }

          final int average = sum/counter;

      String avg = Integer.toString(average);
      String count = Integer.toString(counter);
          word.set(key.toString());
          word2.set(avg + "|" + count);
          context.write(word, word2);
         }
  }

  /*The reducer is incomplete*/
  public static class MeanReducer extends Reducer<Text,Text,Text,Text> {

    private Text word = new Text();
    private Text word2 = new Text();

    public void reduce(Text key, Iterable<Text> values, Context context
                       ) throws IOException, InterruptedException {

    word.set(k);
    word2.set();
    context.write(word, word2);

    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Draft2");
    job.setJarByClass(Draft2.class);
    job.setCombinerClass(MeanCombiner.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(MeanReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
p5cysglq

p5cysglq1#

计算平均值时不应使用合并器。
假设以下是您的输入,我将解释解决方案:

1 50000 2015 pc technology 
2 15424 1998 mouse technology 
3 78420 2010 pen technology 
4 8452 2000 pen stationery
5 4125 2000 pen stationery

Map器:
通过在空白处拆分(“”)来分析每条记录
对于每条记录,将(key,value)作为(year | item | type,price)发出。组合(year | item | type)将为每个记录提供一个唯一的键。例如,对于记录“5 4125 2000 pen文具”,您将发出键:“2000 | pen |文具”,值为:4125。
因此Map器的输出将是:

2015|pc|technology    50000 
1998|mouse|technology 15424 
2010|pen|technology   78420 
2000|pen|stationery   8452 
2000|pen|stationery   4125

减速器:
对于每个键,先计算和,然后计算平均值。
发射平均值以及其他细节
例如,以下键将连接到同一减速器:

2000|pen|stationery   8452 
2000|pen|stationery   4125

输出为:

pen stationery 6288 2000

相关问题