自定义文件格式

mmvthczy  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(435)

我是新的自定义文件格式。我正试图通过执行一个wordcount程序
MapReduce。我正在使用以下课程
wordkey.class(以单词为键)
myinputformat.class类
myrecordreader.class类
myoutputformat.class类
myrecordwriter.class类
mymapper.class类
myreducer.class类
我以如下所示的方式实现了mapper public class MyMapper extends Mapper<WordKey,WordValue,Text,IntWritable>{//rest of the business logic } 我的问题是:
输入键和Map器的输入值是否可以像我在上述代码中编写的那样进行定制;或者别的什么方法可以解决同样的问题?
请建议
提前感谢:)
以下是Map器和缩减器代码


**Mapper**

  public class MyMapper extends Mapper<StudentKey, PassValue,PassValue,IntWritable> {
        public void map(StudentKey key,PassValue value,Context context)throws IOException,InterruptedException
        {

                context.write(new PassValue(value.getResStatus()),new IntWritable(1));

        }
    }

下面是reduce方法的代码

public class MyReducer extends
        Reducer<PassValue, IntWritable, Text, IntWritable> {
int sum=0;
public void reduce(PassValue key,Iterable<IntWritable>value,Context context)throws IOException,InterruptedException{
    for(IntWritable x:value)
    {
        sum+=x.get();
    }
    context.write(new Text(key.toString()), new IntWritable(sum));
}
}

recordreader的代码

public class MyRecordReader extends RecordReader<StudentKey, PassValue> {
    private StudentKey key;
    private PassValue value;
    private LineRecordReader reader=new LineRecordReader();
    @Override
    public void close() throws IOException {
        reader.close();
    }

    @Override
    public StudentKey getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public PassValue getCurrentValue() throws IOException, InterruptedException {
        // return the value
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return reader.getProgress();
    }

    @Override
    public void initialize(InputSplit is, TaskAttemptContext tac)
            throws IOException, InterruptedException {
        reader.initialize(is, tac);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        boolean nextKeyValueStatus=reader.nextKeyValue();
        if(nextKeyValueStatus)
        {
            if(key==null)
            {
                key=new StudentKey();
            }if(value==null)
            {
                value=new PassValue();
            }
            Text line=reader.getCurrentValue();
            String tokens[]=line.toString().split(",");
            key.setName(new Text(tokens[0]));
            key.setRoll(tokens[1]); //string is acceptable here because the setRoll has been customised apart from the normal 
                                    // framework structure
            value.setResStatus(tokens[2]);
        }
        return nextKeyValueStatus;
    }

}

recordwriter的代码

public class MyRecordWriter extends RecordWriter<PassValue, IntWritable> {
    private DataOutputStream out;
    public MyRecordWriter(DataOutputStream out)
    {
        super();
        this.out=out;
    }
    public MyRecordWriter(FSDataOutputStream fileOut) {
        this.out=fileOut;
    }
    @Override
    public void close(TaskAttemptContext tac) throws IOException,
            InterruptedException {
        out.close();

    }
List<IntWritable> x=new ArrayList<IntWritable>();
    @Override
    public void write(PassValue arg0, IntWritable arg1) throws IOException,
            InterruptedException {
        x.add(arg1);
        //write key First
        out.writeBytes(arg0.toString());
        for(IntWritable y:x)
        {
            out.writeBytes(",");
            out.writeBytes(String.valueOf(y.get()));
        }
        out.writeBytes("\r\n");
    }

}

outputformat的代码

public class MyOutputFormat extends FileOutputFormat<PassValue,IntWritable> {

        @Override
        public RecordWriter<PassValue, IntWritable> getRecordWriter(
                TaskAttemptContext tac) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            Path currPath=FileOutputFormat.getOutputPath(tac);
            Path fullPath=new Path(currPath,"result.txt");

            FileSystem fs=currPath.getFileSystem(tac.getConfiguration());
            FSDataOutputStream fileOut=fs.create(fullPath, tac);
            return new MyRecordWriter(fileOut);
        }

    }

自定义密钥的代码

public class StudentKey implements WritableComparable<StudentKey> {
    private Text name;
    private IntWritable roll;
    public StudentKey()
    {

    }

public StudentKey(Text name, IntWritable roll) {
    super();
    this.name = name;
    this.roll = roll;
}
public StudentKey(Text name)
{
    super();
    this.name=name;
}

/**
 * @return the name
 */
public Text getName() {
    return name;
}

/**
 * @param name the name to set
 */
public void setName(Text name) {
    this.name = name;
}

/**
 * @return the roll
 */
public IntWritable getRoll() {
    return roll;
}

/**
 * @param roll the roll to set
 */
public void setRoll(IntWritable roll) {
    this.roll = roll;
}
public void setRoll(String roll) {
    this.roll = new IntWritable(Integer.parseInt(roll));
}

@Override
public void readFields(DataInput arg0) throws IOException {
    // TODO Auto-generated method stub
    this.name.readFields(arg0);
    this.roll.readFields(arg0);
}

@Override
public void write(DataOutput arg0) throws IOException {
    // TODO Auto-generated method stub
    this.name.write(arg0);
    this.roll.write(arg0);  }

/* (non-Javadoc)
 * @see java.lang.Object#hashCode()
 */
@Override
public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((name == null) ? 0 : name.hashCode());
    result = prime * result + ((roll == null) ? 0 : roll.hashCode());
    return result;
}

/* (non-Javadoc)
 * @see java.lang.Object#equals(java.lang.Object)
 */
@Override
public boolean equals(Object obj) {
    if (this == obj)
        return true;
    if (obj == null)
        return false;
    if (getClass() != obj.getClass())
        return false;
    StudentKey other = (StudentKey) obj;
    if (name == null) {
        if (other.name != null)
            return false;
    } else if (!name.equals(other.name))
        return false;
    if (roll == null) {
        if (other.roll != null)
            return false;
    } else if (!roll.equals(other.roll))
        return false;
    return true;
}

@Override
public int compareTo(StudentKey arg0) {
    int comp=this.name.compareTo(arg0.name);
    if(comp!=0)
    {
        return comp;
    }
    else return this.roll.compareTo(arg0.roll);
}

}

自定义值类

public class PassValue implements Writable {
    private Text resStatus;
    public PassValue()
    {
        resStatus=new Text();
    }
    public PassValue(Text resStatus)
    {
        super();
        this.resStatus=resStatus;
    }
    public PassValue(PassValue status)
    {
        super();
        this.resStatus=status.resStatus;
    }
    /**
     * @return the resStatus
     */
    public Text getResStatus() {
        return resStatus;
    }
    /**
     * @param resStatus the resStatus to set
     */
    public void setResStatus(Text resStatus) {
        this.resStatus = resStatus;
    }
    //String implementation
    public void setResStatus(String resStatus) {
        this.resStatus = new Text(resStatus);
    }
    @Override
    public void readFields(DataInput arg0) throws IOException {
        this.resStatus.readFields(arg0);
    }

    @Override
    public void write(DataOutput arg0) throws IOException {
        this.resStatus.write(arg0);
    }
    /* (non-Javadoc)
     * @see java.lang.Object#hashCode()
     */
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result
                + ((resStatus == null) ? 0 : resStatus.hashCode());
        return result;
    }
    /* (non-Javadoc)
     * @see java.lang.Object#equals(java.lang.Object)
     */
    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        PassValue other = (PassValue) obj;
        if (resStatus == null) {
            if (other.resStatus != null)
                return false;
        } else if (!resStatus.equals(other.resStatus))
            return false;
        return true;
    }

}

驱动程序代码

public class DriverCode extends Configured implements Tool {
    static Configuration cf;
    @Override
    public int run(String[] arg0) throws IOException,InterruptedException,ClassNotFoundException {
        cf=new Configuration();
        Job j=Job.getInstance(cf);
        j.setJarByClass(DriverCode.class);
        j.setMapperClass(MyMapper.class);
        j.setMapOutputKeyClass(PassValue.class);
        j.setMapOutputValueClass(IntWritable.class);
//      j.setCombinerClass(WCReduce.class);
        j.setReducerClass(MyReducer.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(Text.class);
        j.setInputFormatClass(MyInputFormat.class);
        j.setOutputFormatClass(MyOutputFormat.class);
        Path op=new Path(arg0[1]);
        FileInputFormat.addInputPath(j, new Path(arg0[0]));
        FileOutputFormat.setOutputPath(j, op);
        op.getFileSystem(cf).delete(op, true);

        return j.waitForCompletion(true)?0:1;
    }

        public static void main(String args[])throws Exception{
            int res=ToolRunner.run(cf, new DriverCode(), args);
            System.exit(res);
        }
}
w6lpcovy

w6lpcovy1#

这是可能的,但您必须编写自己的自定义inputformat和recordreader类。最好的方法是允许Map器将键作为偏移量读取,将值作为行读取,然后使用自定义可写类将Map器输出的键和值与转换一起使用。

相关问题