在hadoop和mapreduce中自定义输入格式

vu8f3i0k  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(393)

如何实现自定义输入格式(记录阅读器)。当key为text,value为text(段落)时

ecbunoof

ecbunoof1#

在类中扩展fileinputformat,您要使其成为您的自定义输入格式,就像您的类名是customtextinputformat一样,并且输入数据文件的每一行都有两个精确的值,这两个值是分开的(,),并且您希望将第一个值作为键,将第二个值作为值,然后下面是代码

public class CustomTextInputFormat extends FileInputFormat<Text, Text> {

@Override
public RecordReader<Text, Text> createRecordReader(InputSplit input,
        TaskAttemptContext ctx) throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    CustomTextRecordReader reader=new CustomTextRecordReader();
    reader.initialize(input, ctx);
    return reader;
}

}

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;<br>
import org.apache.hadoop.fs.FSDataInputStream;<br>
import org.apache.hadoop.fs.FileSystem;<br>
import org.apache.hadoop.fs.Path;<br>
import org.apache.hadoop.io.Text;<br>
import org.apache.hadoop.mapreduce.InputSplit;<br>
import org.apache.hadoop.mapreduce.RecordReader;<br>
import org.apache.hadoop.mapreduce.TaskAttemptContext;<br>
import org.apache.hadoop.mapreduce.lib.input.FileSplit;<br>
import org.apache.hadoop.util.LineReader;<br>

public class CustomTextRecordReader extends RecordReader<Text, Text> {

LineReader lineReader;
Text key;
Text line=new Text();
Text value;
Configuration conf;
FileSplit split;
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub

    try{
        int status=lineReader.readLine(line);
        if(status<=0){
            return false;
        }
        String[] str =line.toString().split(",");
        if(str.length!=2){
            throw new Exception("Illegal number of arguments");
        }
        key = new Text(str[0].trim());
        value = str[1];

    }catch(Exception e){
        e.printStackTrace();
    }

    return true;
}

@Override
public void initialize(InputSplit split, TaskAttemptContext ctx)
        throws IOException, InterruptedException {
    this.split=(FileSplit)split;
    // TODO Auto-generated method stub
    conf = ctx.getConfiguration();
    Path path=this.split.getPath();
    FileSystem fs=path.getFileSystem(conf);
    FSDataInputStream in=null;
    try{
        in = fs.open(path);
        lineReader = new LineReader(in,conf);
    }catch(Exception e){
        e.printStackTrace();
    }

}

@Override
public void close() throws IOException {
    // TODO Auto-generated method stub

}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return 0;
}

}

基本上是nextkeyvaluemehod中的主逻辑,您可以从文件中读取行,然后根据需要应用自己的逻辑来创建键和值。有关更多信息,下面是由framefork运行的mapper的run方法中的代码,用于获取键和值并传递给map方法

public void run(Context context) throws IOException, InterruptedException {
     setup(context);
     while (context.nextKeyValue()) {
         map(context.getCurrentKey(), context.getCurrentValue(), context);
     }
     cleanup(context);
}

相关问题