hadoop自定义记录读取器实现

gg58donl  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(435)

我发现很难理解nextkeyvalue()方法的流程,下面的链接解释了这个方法:
http://analyticspro.org/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/
尤其是nextkeyvalue()中的for循环
任何帮助都是值得的
提前谢谢

l2osamch

l2osamch1#

每当需要一个新数据时,就会发生两件事。向读者提出的第一个问题是
你有什么数据吗???
如果读取器回答yes,则调用者可以从getcurrentvalue方法获取数据。
现在nextkeyvalue方法执行这个任务,它只是回答了一个问题:您还有什么数据要给我吗?
由于防火墙问题,我无法访问链接,但我使用的一个示例实现是

HashMap<Integer, Invoice> allData= new HashMap<Integer, Invoice>();

    @Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if(key == null) {
        this.key = new LongWritable();
    }
    this.key.set(startPos);

    if(value == null) {
        this.value = new Invoice();
    }
    if(startPos >= endPos) {
        key = null;
        value = null;
        return false;
    } else {
        this.value = allData.get(startPos);
        startPos = startPos + 1;
        return true;
    }
}

这里的发票只是一个pojo。在initialize方法中,我只解析整个文档并将其存储在hashmap中。在nextkeyvalue方法中,检查下一个键是否存在如果它返回相应的值,否则返回键不存在。

70gysomp

70gysomp2#

nextkeyvalue()是为特定Map调用设置键和值对的核心函数。因此,从你的链接,下面的代码(for循环之前),它只是设置了pos的关键,这是除了开始偏移量 key.set(pos) 它缓冲掉之前设置的值。对应代码:

public boolean nextKeyValue() throws IOException, InterruptedException {
    if (key == null) {
        key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
        value = new Text();
    }
    value.clear();
    final Text endline = new Text("\n");
    int newSize = 0;

在for循环之后。我已经为每一行添加了足够的注解。

for(int i=0;i<NLINESTOPROCESS;i++){ //Since this is NLineInputFormat they want to read 3 lines at a time and set that as value,
so this loop will continue until that is satisfied.
            Text v = new Text();
            while (pos < end) { //This is to prevent the recordreader from reading the second split, if it is currently reading the first split. pos would be start
of the split and end would be end offset of the split. 
                newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
//This calls the linereader readline function which reads until it encounters a newline(default delim for TextInputformat and maxlinelength would be max integer size
just to ensure the whole line doesn''t go beyond the maxlinelength and the line read would be stored in Text variable v)
                value.append(v.getBytes(),0, v.getLength());
//Reads from v(whole line) and appends it to the value,append is necessary because we are going to read 3 lines.
                value.append(endline.getBytes(),0, endline.getLength());
//appends newline to each line read
                if (newSize == 0) {
                    break;//If there is nothing to read then come out.
                }
                pos += newSize;
                if (newSize < maxLineLength) {//There is a flaw here it should be >=, to imply if the read line is greater than max integer size then come out
                    break;
                }
            }
        }
        if (newSize == 0) {
            key = null;//If there is nothing to read assign key and value as null else continue the process by returning true to map call.
            value = null;
            return false;
        } else {
            return true;
        }
    }
}
fcg9iug3

fcg9iug33#

每个Map器都将使用nextkeyvalue()方法在所有拆分记录之间进行迭代。
nlinesrecordreader类定义每个记录有3行。

private final int NLINESTOPROCESS = 3;

nextkeyvalue()中循环的主要作用是为每条记录获取3行。该记录将用作map()方法的输入值。

相关问题