pig udf处理多行元组拆分为不同的Map器

bjg7j2ky  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(343)

我有一个文件,其中每个元组跨越多行,例如:

START
name: Jim
phone: 2128789283
address: 56 2nd street, New York, USA
END
START
name: Tom
phone: 6308789283
address: 56 5th street, Chicago, 13611, USA
END
.
.
.

上面是我文件中的两个元组。我写的自定义项定义了 getNext() 函数检查它是否启动,然后我将初始化我的元组;如果是end,则返回元组(从字符串缓冲区);否则,我将只添加字符串到字符串缓冲区。
如果文件大小小于hdfs块大小64mb(在amazonemr上),那么它工作得很好,但是如果文件大小大于64mb,它将失败。我试着四处搜索,找到这篇博文。raja的解释很容易理解,他提供了一个示例代码。但是代码实现了 RecordReader 部分,而不是 getNext() 对于Pig LoadFunc . 只是想知道是否有人有处理多行Pig元组分裂问题的经验?我应该继续吗 RecordReader 在Pig身上?如果是,怎么做?
谢谢。

wz3gfoph

wz3gfoph1#

如果可以用start作为分隔符,可能下面的代码没有自定义项就可以工作

SET textinputformat.record.delimiter 'START';
a  =  load  '<input path>' as  (data:chararray);
dump a;

输出如下所示:

(
    name: Jim
    enter code here`phone: 2128789283
    address: 56 2nd street, New York, USA
    END
    )

    (
    name: Tom
    phone: 6308789283
    address: 56 5th street, Chicago, 13611, USA
    END
    )

现在两者都被分成两个元组。

50few1ms

50few1ms2#

你可以像盖伊提到的那样预处理你的输入,也可以应用这里描述的其他技巧。
我认为最干净的解决方案是实现一个定制的inputformat(以及它的recordreader),它创建一个记录/开始-结束。pig的loadfunc位于hadoop的inputformat之上,因此您可以定义loadfunc将使用哪种inputformat。
自定义loadfunc的原始框架实现如下所示:

import java.io.IOException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

public class CustomLoader extends LoadFunc {

    private RecordReader reader;
    private TupleFactory tupleFactory;

    public CustomLoader() {
        tupleFactory = TupleFactory.getInstance();
    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        return new MyInputFormat(); //custom InputFormat
    }

    @Override
    public Tuple getNext() {
        Tuple result = null;
        try {
            if (!reader.nextKeyValue()) {
                return null;
            }
            //value can be a custom Writable containing your name/value 
            //field pairs for a given record
            Object value = reader.getCurrentValue();
            result = tupleFactory.newTuple();
            // ...
            //append fields to tuple
        }
        catch (Exception e) {
            // ...
        }
        return result;
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit pigSplit) 
      throws IOException {
        this.reader = reader;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        FileInputFormat.setInputPaths(job, location);
    }
}

之后 LoadFunc 初始化 InputFormat 以及它的 RecordReader ,它定位数据的输入位置并开始从recordreader获取记录,创建结果元组(getnext()),直到完全读取输入。
关于自定义输入格式的一些备注:
我将创建一个自定义输入格式,其中recordreader是 org.apache.hadoop.mapreduce.lib.input.LineRecordReader :除了 initialize() :它将调用自定义linereader(基于 org.apache.hadoop.util.LineReader ). inputformat的键应该是行偏移量(long),值应该是一个自定义的可写值。这将保存记录的字段(即开始和结束之间的数据)作为键值对的列表。每次你的阅读器 nextKeyValue() 记录被写入可由linereader写入的自定义文件。整件事的要点是如何实现 LineReader.readLine() .
另一种可能更简单的方法是更改textinputformat的分隔符(它在hadoop0.23中是可配置的,请参阅) textinputformat.record.delimiter )一个适合您的数据结构(如果可能的话)。在这种情况下,您的数据最终将进入 Text 您需要从中拆分和提取kv对,并将其转换为元组。

相关问题