json mapreduce中的错误

camsedfj  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(325)

我正在处理一个mapreduce(仅Map任务),它读取json文件并从json输入中提取元素。输入数据:

{"type":"cloud_monitor","format":"default","version":"1.0","id":"71101cb85441995d11a43bb","start":"1413585245.921","cp":"254623","message":{"proto":"http","protoVer":"1.1","status":"403","cliIP":"23.79.231.14","reqPort":"80","reqHost":"ksd.metareactor.com","reqMethod":"GET","reqPath":"%2findex.php","reqQuery":"path%3d57%26product_id%3d49%26route%3d%255Cwinnt%255Cwin.ini%2500.","respCT":"text/html","respLen":"286","bytes":"286","UA":"mozilla-saturn","fwdHost":"origin-demo2-akamaized.scoe-sil.net"},"reqHdr":{"accEnc":"gzip,%20deflate","cookie":"PHPSESSID%3dkkqoodvfe0rt9l7lbvqghk6e15%3bcurrency%3dUSD%3blanguage%3den"}}

我已经为json数组声明了字符串变量:message&reqhdr,您可以在context.write()方法中看到它们
Map类:

public class JsonMapper extends Mapper<LongWritable, Text, Text, Text> {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String type;
    String format;
    String version;
    String id;
    String start;
    String cp;
    // variables for message and reqHdr
    String[] line = value.toString().split("\\n");
    if (line.length > 0) {
        for(int i=0; i<line.length; i++) {
            try {
                JSONObject jsonobj      = new JSONObject(line[i]);
                    type    = (String) jsonobj.get("type");
                    format  = (String) jsonobj.get("format");
                    version = (String) jsonobj.get("version");
                    id      = (String) jsonobj.get("id");
                    start   = (String) jsonobj.get("start"); 
                    cp      = (String) jsonobj.get("cp");

                // Message Variable array
                JSONArray messageArray  = (JSONArray) jsonobj.get("message");
                for(int j=0; j<messageArray.length(); j++) {
                    JSONObject jsonmessageobject = messageArray.getJSONObject(j);
                    proto       = jsonmessageobject.getString("proto");
                    protoVer    = jsonmessageobject.getString("protoVer");
                    cliIP       = jsonmessageobject.getString("cliIP");
                    reqPort     = jsonmessageobject.getString("reqPort");
                    reqHost     = jsonmessageobject.getString("reqHost");
                    reqMethod   = jsonmessageobject.getString("reqMethod");
                    reqPath     = jsonmessageobject.getString("reqPath");
                    reqQuery    = jsonmessageobject.getString("reqQuery");
                    reqCT       = jsonmessageobject.getString("reqCT");
                    reqLen      = jsonmessageobject.getString("reqLen");
                    sslVer      = jsonmessageobject.getString("sslVer");
                    status      = jsonmessageobject.getString("status");
                    redirURL    = jsonmessageobject.getString("redirURL");
                    respCT      = jsonmessageobject.getString("respCT");
                    respLen     = jsonmessageobject.getString("respLen");
                    bytes       = jsonmessageobject.getString("bytes");
                    UA          = jsonmessageobject.getString("UA");
                    fwdHost     = jsonmessageobject.getString("fwdHost");
                }

                // reqHdr variable array
                JSONArray reqHdrArray   = (JSONArray) jsonobj.get("reqHdr");
                for(int k=0; k<reqHdrArray.length(); k++) {
                    JSONObject jsonreqHdrobject = reqHdrArray.getJSONObject(i);
                    accEnc      = jsonreqHdrobject.getString("accEnc");
                    accLang     = jsonreqHdrobject.getString("accLang");
                    auth        = jsonreqHdrobject.getString("auth");
                    reqHdr_cacheCtl = jsonreqHdrobject.getString("cacheCtl");
                    reqHdr_conn = jsonreqHdrobject.getString("conn");
                    reqHdr_contMD5 = jsonreqHdrobject.getString("contMD5");
                    cookie      = jsonreqHdrobject.getString("cookie");
                    DNT         = jsonreqHdrobject.getString("DNT");
                    expect      = jsonreqHdrobject.getString("expect");
                    ifMatch     = jsonreqHdrobject.getString("ifMatch");
                    ifMod       = jsonreqHdrobject.getString("ifMod");
                    ifNone      = jsonreqHdrobject.getString("ifNone");
                    ifRange     = jsonreqHdrobject.getString("ifRange");
                    ifUnmod     = jsonreqHdrobject.getString("ifUnmod");
                    range       = jsonreqHdrobject.getString("range");
                    referer     = jsonreqHdrobject.getString("referer");
                    te          = jsonreqHdrobject.getString("te");
                    upgrade     = jsonreqHdrobject.getString("upgrade");
                    reqHdr_via  = jsonreqHdrobject.getString("via");
                    xFrwdFor    = jsonreqHdrobject.getString("xFrwdFor");
                    xReqWith    = jsonreqHdrobject.getString("xReqWith");
                }
            context.write(new Text("cloud_monitor"), new Text(type + format + version + id + start + cp + proto + protoVer + cliIP + reqPort + 
                    reqHost + reqMethod + reqPath + reqQuery + reqCT + reqLen + sslVer + status + redirURL + respCT + respLen + bytes + UA + fwdHost + accEnc   + accLang + auth + 
                    reqHdr_cacheCtl + reqHdr_conn + reqHdr_contMD5 + cookie + DNT + expect + ifMatch + ifMod + ifNone + ifRange + ifUnmod + range + referer + te +
                    upgrade + reqHdr_via + xFrwdFor + xReqWith ));
            } catch (JSONException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
}

驾驶员等级:

public class JsonDriver {
public static void main(String[] args) throws IOException {
    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration);
    job.setJobName("Json Parser");
    job.setJarByClass(com.json.driver.JsonDriver.class);
    job.setMapperClass(JsonMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job,  new Path(args[1]));
}
}

当我尝试使用命令提交jar文件时

[cloudera@quickstart ~]$ hadoop jar JsonMapper.jar myjson jsonoutput

我没有看到任何错误或任何类型的输出或任何消息。我看到的只是下一个命令行

[cloudera@quickstart ~]$

我有hdfs的输入。如果我得到任何错误,我可以尝试修复它,但是在提交jar文件后,我只看到下一个命令行。谁能告诉我我在这里犯了什么错误吗?

gab6jxml

gab6jxml1#

你实际上没有提交/启动驱动程序中的作业。您需要在驱动程序末尾添加以下内容:

job.waitForCompletion(true);

目前,它只会在驱动程序中运行main,而不会提交作业。

相关问题