如何在piglatin中每次加载具有不同分隔符的文件

f8rj6qna  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(312)

来自输入源的数据具有不同的分隔符,如,或。有时可能,有时可能。但pigstorage函数一次只接受一个参数作为分隔符。如何加载此类数据[使用分隔符或;]

nzk0hqpo

nzk0hqpo1#

A = LOAD '/some/path/COMMA-DELIM-PREFIX*' USING PigStorage(',') AS (f1:chararray, ...);
B = LOAD '/some/path/SEMICOLON-DELIM-PREFIX*' USING PigStorage('\t') AS (f1:chararray, ...);

C = UNION A,B;
sxissh06

sxissh062#

你能检查一下这个是否适合你吗?
它将使用不同的分隔符处理所有的输入文件
它也可以用不同的分隔符处理同一个文件。
可以在字符类中添加任意多的分隔符 [,:,] 例子:

input1.txt
1,2,3,4

input2.txt
a-b-c-d

input3.txt
100:200:300:400

input4.txt
100,aaa-200:b

PigScript:
A = LOAD 'input*' AS line;
B = FOREACH A GENERATE FLATTEN(REGEX_EXTRACT_ALL(line,'(.*)[,:-](.*)[,:-](.*)[,:-](.*)'))  AS (f1,f2,f3,f4);
DUMP B;

Output:
(1,2,3,4)
(a,b,c,d)
(100,200,300,400)
(100,aaa,200,b)
sg24os4d

sg24os4d3#

You need to write your own custom loader for delimiter .

Steps for writing custom loader :

As of 0.7.0, Pig loaders extend the LoadFunc abstract class.This means they need to override 4 methods:

    getInputFormat() this method returns to the caller an instance of the InputFormat that the loader supports. The actual load process needs an instance to use at load time, and doesn't want to place any constraints on how that instance is created.
    prepareToRead() is called prior to reading a split. It passes in the reader used during the reads of the split, as well as the actual split. The implementation of the loader usually keeps the reader, and may want to access the actual split if needed.
    setLocation() Pig calls this to communicate the load location to the loader, which is responsible for passing that information to the underlying InputFormat object. This method can be called multiple times, so there should be no state associated with the method (unless that state gets reset when the method is called).
    getNext() Pig calls this to get the next tuple from the loader once all setup has been done. If this method returns a NULL, Pig assumes that all  information in the split passed via the prepareToRead() method has been processed. 

please find the code 

package Pig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

public class CustomLoader extends LoadFunc {

     private String DELIM = ",";
     private static final int DEFAULT_LIMIT = 226;
     private int limit = DEFAULT_LIMIT;
     private RecordReader reader;
     private List <Integer>indexes;
     private TupleFactory tupleFactory;

     public CustomLoader(String delimter) {
            this.DELIM = delimter;

        }

     @Override
     public InputFormat getInputFormat() throws IOException {
       return new TextInputFormat();

     }

     @Override
     public Tuple getNext() throws IOException {
      Tuple tuple = null;
      List values = new ArrayList();
      tupleFactory = TupleFactory.getInstance();
      try {
       boolean notDone = reader.nextKeyValue();
       if (!notDone) {
           return null;
       }
       Text value = (Text) reader.getCurrentValue();

       if(value != null) {
        String parts[] = value.toString().split(DELIM);

         for (int index=0 ;index< parts.length;index++) {

             if(index > limit) {
          throw new IOException("index "+index+ "is out of bounds: max index = "+limit);
         } else {
          values.add(parts[index]);
         }
         }

        tuple = tupleFactory.newTuple(values);
       }

      } catch (InterruptedException e) {
       // add more information to the runtime exception condition. 
       int errCode = 6018;
                String errMsg = "Error while reading input";
                throw new ExecException(errMsg, errCode,
                        PigException.REMOTE_ENVIRONMENT, e);
      }

      return tuple;

     }

     @Override
     public void prepareToRead(RecordReader reader, PigSplit pigSplit)
       throws IOException {
      this.reader = reader; // note that for this Loader, we don't care about the PigSplit.
     }

     @Override
     public void setLocation(String location, Job job) throws IOException {
      FileInputFormat.setInputPaths(job, location); // the location is assumed to be comma separated paths. 

     }
     public static void main(String[] args) {

    }

    } 

create a jar file 

register '/home/impadmin/customloader.jar' ;

load '/pig/u.data' using Pig.CustomLoader('::') as (id,mov_id,rat,timestamp);

data sets 

196::242::3::881250949
186::302::3::891717742
22::377::1::878887116
244::51::2::880606923
166::346::1::886397596
298::474::4::884182806
115::265::2::881171488
253::465::5::891628467
305::451::::886324817
6::86::3::883603013

Now you can specify any delimiter you want

相关问题