为什么在使用sequencefileinputformat时不调用Map器函数

bt1cpqcv  于 2021-06-03  发布在  Hadoop
关注(0)|答案(0)|浏览(285)

我在这个问题上花了两天时间。如果有人能帮忙,请提前感谢!描述如下:
第一个mapper和reduce工作正常,可以在输出路径中找到带有sequencefileoutputformat的输出。
第一个Map器:

public static class TextToRecordMapper 
    extends Mapper<Object, Text, Text, IntArrayWritable>{   

    public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {    
    }
}

第一减速器:

public static class MacOneSensorSigCntReducer 
      extends Reducer<Text,IntArrayWritable,Text,IntArrayWritable> {

    public void reduce(Text key, Iterable<IntArrayWritable> values, 
                  Context context
                  ) throws IOException, InterruptedException {  
    }
}

工作部分:

Job job = new Job(conf, "word count");
job.setJarByClass(RawInputText.class);

job.setMapperClass(TextToRecordMapper.class);

job.setReducerClass(MacOneSensorSigCntReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntArrayWritable.class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

job.waitForCompletion(true);

这很好,然后我添加了第二个Map器和缩减器来处理第一部分的输出。
第二个Map器:

public static class MacSensorsTimeLocMapper 
  extends Mapper<Text,IntArrayWritable,Text,IntWritable> {

    private Text macInfo = new Text();
    public void map(Text key, Iterable<IntArrayWritable> values, 
                  Context context
                  ) throws IOException, InterruptedException {
    }
}

二级减速器:

public static class MacInfoTestReducer 
    extends Reducer<Text,IntWritable,Text,Text> {
        public void reduce(Text key, Iterable<IntWritable> values, 
                      Context context
                      ) throws IOException, InterruptedException {
    }
}

工作部分:

Job secondJob = new Job(conf, "word count 2");

secondJob.setJarByClass(RawInputText.class);

FileInputFormat.addInputPath(secondJob, new Path(otherArgs[1]));
secondJob.setInputFormatClass(SequenceFileInputFormat.class);

secondJob.setMapperClass(MacSensorsTimeLocMapper.class);    

secondJob.setMapOutputKeyClass(Text.class);    
secondJob.setMapOutputValueClass(IntArrayWritable.class); 

//do not use test reducer to make things simple    
//secondJob.setReducerClass(MacInfoTestReducer.class);       

FileOutputFormat.setOutputPath(secondJob, new Path(otherArgs[2]));

System.exit(secondJob.waitForCompletion(true) ? 0 : 1);

当我运行代码时,不会调用第二个Map器函数,输出是用如下文本生成的:

00:08:CA:6C:A2:81   com.hicapt.xike.IntArrayWritable@234265

似乎框架调用的是identitymapper而不是我的。但是我如何改变它,使我的Map器以sequencefileinputformat作为输入格式来调用呢。
下面添加的所有代码:

import java.io.IOException;
import java.util.Collection;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class RawInputText {

  public static class TextToRecordMapper 
       extends Mapper<Object, Text, Text, IntArrayWritable>{

    private Text word = new Text();    
    private IntArrayWritable mapv = new IntArrayWritable();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {        

      String        line     = value.toString();
      String[]      valArray = line.split(",");

      if(valArray.length == 6){
          IntWritable[] valInts  = new IntWritable[2]; 

          word.set(valArray[0]+"-"+valArray[1]);

          valInts[0] = new IntWritable(Integer.parseInt(valArray[2]));
          valInts[1] = new IntWritable(Integer.parseInt(valArray[4]));

          mapv.set(valInts);      

          context.write(word, mapv);  
      }
    }
  }

  public static class MacOneSensorSigCntReducer 
      extends Reducer<Text,IntArrayWritable,Text,IntArrayWritable> {    
    private Text macKey   = new Text(); 
    private IntArrayWritable macInfo = new IntArrayWritable();

    public void reduce(Text key, Iterable<IntArrayWritable> values, 
                      Context context
                      ) throws IOException, InterruptedException {      
        String[]      keyArray = key.toString().split("-");

        if(keyArray.length < 2){
            int a = 10;

            a= 20;
        }

        String mac = keyArray[1];
        String sen = keyArray[0];

        Hashtable<Integer, MinuteSignalInfo> rssiTime = new Hashtable<Integer, MinuteSignalInfo>();
        MinuteSignalInfo minSig;
        int rssi = 0;
        int ts   = 0;
        int i    = 0;

        for (IntArrayWritable val : values) {           
            i = 0;
            for(Writable element : val.get()) {
                  IntWritable eleVal = (IntWritable)element;
                  if(i%2 == 0)
                      rssi = eleVal.get();
                  else
                      ts   = eleVal.get()/60;
                  i++;
            }

            minSig = (MinuteSignalInfo)rssiTime.get(ts);
            if(minSig == null){
                minSig = new MinuteSignalInfo();
                minSig.rssi = rssi;
                minSig.count = 1;
            }else{
                minSig.rssi  += rssi;
                minSig.count += 1;
            }           
            rssiTime.put(ts, minSig);
        }

        TreeMap<Integer, MinuteSignalInfo> treeMap = new TreeMap<Integer, MinuteSignalInfo>();
        treeMap.putAll(rssiTime);

        macKey.set(mac);        

        i = 0;
        IntWritable[] valInts  = new IntWritable[1+treeMap.size()*3]; 

        valInts[i++] = new IntWritable(Integer.parseInt(sen));

        Collection<Integer> macs = treeMap.keySet();
        Iterator<Integer> it = macs.iterator();
        while(it.hasNext()) {
            int tsKey = it.next();
            valInts[i++] = new IntWritable(tsKey);
            valInts[i++] = new IntWritable(treeMap.get(tsKey).rssi);
            valInts[i++] = new IntWritable(treeMap.get(tsKey).count);
        }       

        macInfo.set(valInts);

        context.write(macKey, macInfo);
    }
}

  public static class MacSensorsTimeLocMapper 
      extends Mapper<Text,IntArrayWritable,Text,IntWritable> {

    private Text macInfo = new Text();
    public void map(Text key, Iterable<IntArrayWritable> values, 
                      Context context
                      ) throws IOException, InterruptedException {

        int i      = 0;
        int sensor = 0;
        int ts     = 0;
        int rssi   = 0;
        int count  = 0;  

        Hashtable<Integer, MinuteSignalInfo> rssiTime = new Hashtable<Integer, MinuteSignalInfo>();
        MinuteSignalInfo minSig;

        for (IntArrayWritable val : values) {
            i = 0;          

            for(Writable element : val.get()) {
                IntWritable eleVal = (IntWritable)element;                
                int valval   = eleVal.get();
                if(i == 0) {
                    sensor = valval;
                }else if(i%3 == 1){
                    ts = valval;
                }else if(i%3 == 2){
                    rssi = valval;
                }else if(i%3 == 0){
                    count = valval;             

                    minSig = (MinuteSignalInfo)rssiTime.get(ts);
                    if(minSig == null){
                        minSig = new MinuteSignalInfo();
                        minSig.rssi = rssi;
                        minSig.count = count;
                        minSig.sensor = sensor;

                        rssiTime.put(ts, minSig);
                    }else{
                        if((rssi/count) < (minSig.rssi/minSig.count)){
                            minSig.rssi = rssi;
                            minSig.count = count;
                            minSig.sensor = sensor;

                            rssiTime.put(ts, minSig);
                        }
                    }
                }

                i++;                
            }           
        } 

        TreeMap<Integer, MinuteSignalInfo> treeMap = new TreeMap<Integer, MinuteSignalInfo>();
        treeMap.putAll(rssiTime);

        String macLocs = "";
        Collection<Integer> tss = treeMap.keySet();
        Iterator<Integer> it = tss.iterator();
        while(it.hasNext()) {
            int tsKey = it.next();
            macLocs += String.valueOf(tsKey) + ",";
            macLocs += String.valueOf(treeMap.get(tsKey).sensor) + ";";
        }   

       macInfo.set(macLocs);

       context.write(key, new IntWritable(10));
       //context.write(key, macInfo);
    }
  }

  public static class MacSensorsTimeLocReducer 
      extends Reducer<Text,IntArrayWritable,Text,Text> {

    private Text macInfo = new Text();
    public void reduce(Text key, Iterable<IntArrayWritable> values, 
                      Context context
                      ) throws IOException, InterruptedException {

        int i      = 0;
        int sensor = 0;
        int ts     = 0;
        int rssi   = 0;
        int count  = 0;  

        Hashtable<Integer, MinuteSignalInfo> rssiTime = new Hashtable<Integer, MinuteSignalInfo>();
        MinuteSignalInfo minSig;

        for (IntArrayWritable val : values) {
            i = 0;          

            for(Writable element : val.get()) {
                IntWritable eleVal = (IntWritable)element;                
                int valval   = eleVal.get();
                if(i == 0) {
                    sensor = valval;
                }else if(i%3 == 1){
                    ts = valval;
                }else if(i%3 == 2){
                    rssi = valval;
                }else if(i%3 == 0){
                    count = valval;             

                    minSig = (MinuteSignalInfo)rssiTime.get(ts);
                    if(minSig == null){
                        minSig = new MinuteSignalInfo();
                        minSig.rssi = rssi;
                        minSig.count = count;
                        minSig.sensor = sensor;

                        rssiTime.put(ts, minSig);
                    }else{
                        if((rssi/count) < (minSig.rssi/minSig.count)){
                            minSig.rssi = rssi;
                            minSig.count = count;
                            minSig.sensor = sensor;

                            rssiTime.put(ts, minSig);
                        }
                    }
                }

                i++;                
            }           
        } 

        TreeMap<Integer, MinuteSignalInfo> treeMap = new TreeMap<Integer, MinuteSignalInfo>();
        treeMap.putAll(rssiTime);

        String macLocs = "";
        Collection<Integer> tss = treeMap.keySet();
       Iterator<Integer> it = tss.iterator();
       while(it.hasNext()) {
        int tsKey = it.next();
        macLocs += String.valueOf(tsKey) + ",";
        macLocs += String.valueOf(treeMap.get(tsKey).sensor) + ";";
       }    

       macInfo.set(macLocs);

       context.write(key, macInfo);
    }
  }

  public static class MacInfoTestReducer 
    extends Reducer<Text,IntArrayWritable,Text,Text> {

        private Text macInfo = new Text();
        public void reduce(Text key, Iterable<IntArrayWritable> values, 
                          Context context
                          ) throws IOException, InterruptedException {
            String tmp = ""; 

            for (IntArrayWritable val : values) {

                for(Writable element : val.get()) {
                    IntWritable eleVal = (IntWritable)element;                
                    int valval   = eleVal.get();

                    tmp += String.valueOf(valval) + " ";
                }
            }

            macInfo.set(tmp);
            context.write(key, macInfo);
        }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 3) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }    

    /* 

    Job job = new Job(conf, "word count");
    job.setJarByClass(RawInputText.class);

    job.setMapperClass(TextToRecordMapper.class);

    job.setReducerClass(MacOneSensorSigCntReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntArrayWritable.class);

    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    job.waitForCompletion(true);

    */    

    Job secondJob = new Job(conf, "word count 2");

    secondJob.setJarByClass(RawInputText.class);

    FileInputFormat.addInputPath(secondJob, new Path(otherArgs[1]));
    secondJob.setInputFormatClass(SequenceFileInputFormat.class);

    secondJob.setMapperClass(MacSensorsTimeLocMapper.class);   
    //secondJob.setMapperClass(Mapper.class);

    secondJob.setMapOutputKeyClass(Text.class);    
    secondJob.setMapOutputValueClass(IntArrayWritable.class); 

    secondJob.setReducerClass(MacInfoTestReducer.class);

    //secondJob.setOutputKeyClass(Text.class);
    //secondJob.setOutputValueClass(IntArrayWritable.class);     

    FileOutputFormat.setOutputPath(secondJob, new Path(otherArgs[2]));

    System.exit(secondJob.waitForCompletion(true) ? 0 : 1);

  }
}

package com.hicapt.xike;

public class MinuteSignalInfo {
    public int sensor;
    public int rssi;
    public int count;

    public MinuteSignalInfo() {
        rssi   = 0;
        count  = 0;
        sensor = 0;
    }
}

package com.hicapt.xike;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;

public class IntArrayWritable extends ArrayWritable {

    public IntArrayWritable() {
        super(IntWritable.class);
    }
    /*
    public void readFields(DataInput in) throws IOException{        
        super.readFields(in);
    }

    public void write(DataOutput out) throws IOException{       
        super.write(out);
    }*/
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题