ApachePig—如何使用PigLoader将包含json字符串的平面文件作为每行的一部分处理为csv文件?

3htmauhk  于 2021-06-21  发布在  Pig
关注(0)|答案(1)|浏览(213)

我在hdfs中有一个文件
44,英国,{“names”:{“name1”:“john”,“name2”:“marry”,“name3”:“stuart”},“fruits”:{“fruit1”:“apple”,“fruit2”:“orange”},2016年7月31日
91,印度,{“names”:{“name1”:“ram”,“name2”:“sam”},“fruits”:{}},2016年7月31日
并希望使用pig loader将其存储到scv文件中,如下所示:
44,英国,姓名,姓名1,约翰,2016年7月31日
44岁,英国,姓名,姓名2,已婚,2016年7月31日
..
44,英国,水果,水果1,苹果,2016年7月31日
..
91,印度,姓名,姓名1,拉姆,2016年7月31日
..
91,印度,空,空,ram,31-07-2016
Pig的脚本应该是什么?

y3bcpkx1

y3bcpkx11#

因为您的记录不是一个正确的json字符串,所以任何json存储程序/加载程序都帮不了您。编写自定义项框架将是一种更简单的方法。
更新方法1:-
如果您正在将输入转换为制表符分隔的文件,下面的udf和pig脚本将起作用。
自定义项:-

package com.test.udf;

import org.apache.commons.lang3.StringUtils;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**

* input format :-
* {"names":{"name1":"John","name2":"marry","name3":"stuart"},"fruits":    {"fruit1":"apple","fruit2":"orange"}}
* /

public class jsonToTuples extends EvalFunc<DataBag> {
ObjectMapper objectMapper = new ObjectMapper();
TypeReference typeRef = new TypeReference<HashMap<String, Object>>() {
};

@Override
public DataBag exec(Tuple input) throws IOException {
    if (input == null || input.size() == 0) {
        return null;
    } else {
        String jsonRecord = (String) input.get(0);
        if (StringUtils.isNotBlank(jsonRecord)) {
            try {
                List<String> recordList = new ArrayList<String>();
                Map<String, Object> jsonDataMap = objectMapper.readValue(jsonRecord, typeRef);
                if(jsonDataMap.get("names") != null) {
                    Map<String, String> namesDataMap  = (Map<String, String>) jsonDataMap.get("names");
                    for(String key : namesDataMap.keySet()){
                        recordList.add("names" + "," + key + "," + namesDataMap.get(key));
                    }

                }
                if(jsonDataMap.get("fruits") != null) {
                    Map<String, String> fruitsDataMap  = (Map<String, String>) jsonDataMap.get("fruits");
                    for(String key : fruitsDataMap.keySet()){
                        recordList.add("fruits" + "," + key + "," + fruitsDataMap.get(key));
                    }

                }
                DataBag outputBag = BagFactory.getInstance().newDefaultBag();
                for( int i = 0 ; i < recordList.size() ; i++){
                    Tuple outputTuple = TupleFactory.getInstance().newTuple(1);
                    outputTuple.set(0 , recordList.get(i));
                    outputBag.add(outputTuple);
                }

                return outputBag;
            }catch(Exception e){
                System.out.println("caught exception for ");
                e.printStackTrace();
                return null;
            }
        }
    }
    return null;
    }
}

Pig脚本:-

register 'testUDF.jar' ;
A = load 'data.txt' using PigStorage() as (id:chararray , country:chararray , record:chararray , date:chararray);
B = Foreach A generate id, country , FLATTEN(com.test.udf.jsonToTuples(record)) , date ;
dump B ;

旧方法:-
下面我要提到的方式,我将在我的自定义项阅读你的记录,如果它是逗号分隔。
正如我在下面的评论中提到的,试着用自定义项中的拆分来分隔字段。我还没有测试,但以下是我可以尝试在我的自定义项:-
(请注意,我不确定这是最好的选择-您可能需要进一步改进。)

String[] strSplit = ((String) input.get(0)).split("," , 3);
String id = strSplit[0] ;
String country = strSplit[1] ;
String jsonWithDate = strSplit[2] ;

String[] datePart =  ((String) input.get(0)).split(",");    
String date = datePart[datePart.length-1];

/**
 * above jsonWithDate should look like -
 * {"names":{"name1":"Ram","name2":"Sam"},"fruits":{}},31-07-2016
 * 

* /

String jsonString = jsonWithDate.replace(date,"").replace(",$", "");

/**

* now use some parser or object mapper to convert jsonString to desired list of values.
* /

相关问题