我正在处理一个mapreduce(仅Map任务),它读取json文件并从json输入中提取元素。输入数据:
{"type":"cloud_monitor","format":"default","version":"1.0","id":"71101cb85441995d11a43bb","start":"1413585245.921","cp":"254623","message":{"proto":"http","protoVer":"1.1","status":"403","cliIP":"23.79.231.14","reqPort":"80","reqHost":"ksd.metareactor.com","reqMethod":"GET","reqPath":"%2findex.php","reqQuery":"path%3d57%26product_id%3d49%26route%3d%255Cwinnt%255Cwin.ini%2500.","respCT":"text/html","respLen":"286","bytes":"286","UA":"mozilla-saturn","fwdHost":"origin-demo2-akamaized.scoe-sil.net"},"reqHdr":{"accEnc":"gzip,%20deflate","cookie":"PHPSESSID%3dkkqoodvfe0rt9l7lbvqghk6e15%3bcurrency%3dUSD%3blanguage%3den"}}
我已经为json数组声明了字符串变量:message&reqhdr,您可以在context.write()方法中看到它们
Map类:
public class JsonMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String type;
String format;
String version;
String id;
String start;
String cp;
// variables for message and reqHdr
String[] line = value.toString().split("\\n");
if (line.length > 0) {
for(int i=0; i<line.length; i++) {
try {
JSONObject jsonobj = new JSONObject(line[i]);
type = (String) jsonobj.get("type");
format = (String) jsonobj.get("format");
version = (String) jsonobj.get("version");
id = (String) jsonobj.get("id");
start = (String) jsonobj.get("start");
cp = (String) jsonobj.get("cp");
// Message Variable array
JSONArray messageArray = (JSONArray) jsonobj.get("message");
for(int j=0; j<messageArray.length(); j++) {
JSONObject jsonmessageobject = messageArray.getJSONObject(j);
proto = jsonmessageobject.getString("proto");
protoVer = jsonmessageobject.getString("protoVer");
cliIP = jsonmessageobject.getString("cliIP");
reqPort = jsonmessageobject.getString("reqPort");
reqHost = jsonmessageobject.getString("reqHost");
reqMethod = jsonmessageobject.getString("reqMethod");
reqPath = jsonmessageobject.getString("reqPath");
reqQuery = jsonmessageobject.getString("reqQuery");
reqCT = jsonmessageobject.getString("reqCT");
reqLen = jsonmessageobject.getString("reqLen");
sslVer = jsonmessageobject.getString("sslVer");
status = jsonmessageobject.getString("status");
redirURL = jsonmessageobject.getString("redirURL");
respCT = jsonmessageobject.getString("respCT");
respLen = jsonmessageobject.getString("respLen");
bytes = jsonmessageobject.getString("bytes");
UA = jsonmessageobject.getString("UA");
fwdHost = jsonmessageobject.getString("fwdHost");
}
// reqHdr variable array
JSONArray reqHdrArray = (JSONArray) jsonobj.get("reqHdr");
for(int k=0; k<reqHdrArray.length(); k++) {
JSONObject jsonreqHdrobject = reqHdrArray.getJSONObject(i);
accEnc = jsonreqHdrobject.getString("accEnc");
accLang = jsonreqHdrobject.getString("accLang");
auth = jsonreqHdrobject.getString("auth");
reqHdr_cacheCtl = jsonreqHdrobject.getString("cacheCtl");
reqHdr_conn = jsonreqHdrobject.getString("conn");
reqHdr_contMD5 = jsonreqHdrobject.getString("contMD5");
cookie = jsonreqHdrobject.getString("cookie");
DNT = jsonreqHdrobject.getString("DNT");
expect = jsonreqHdrobject.getString("expect");
ifMatch = jsonreqHdrobject.getString("ifMatch");
ifMod = jsonreqHdrobject.getString("ifMod");
ifNone = jsonreqHdrobject.getString("ifNone");
ifRange = jsonreqHdrobject.getString("ifRange");
ifUnmod = jsonreqHdrobject.getString("ifUnmod");
range = jsonreqHdrobject.getString("range");
referer = jsonreqHdrobject.getString("referer");
te = jsonreqHdrobject.getString("te");
upgrade = jsonreqHdrobject.getString("upgrade");
reqHdr_via = jsonreqHdrobject.getString("via");
xFrwdFor = jsonreqHdrobject.getString("xFrwdFor");
xReqWith = jsonreqHdrobject.getString("xReqWith");
}
context.write(new Text("cloud_monitor"), new Text(type + format + version + id + start + cp + proto + protoVer + cliIP + reqPort +
reqHost + reqMethod + reqPath + reqQuery + reqCT + reqLen + sslVer + status + redirURL + respCT + respLen + bytes + UA + fwdHost + accEnc + accLang + auth +
reqHdr_cacheCtl + reqHdr_conn + reqHdr_contMD5 + cookie + DNT + expect + ifMatch + ifMod + ifNone + ifRange + ifUnmod + range + referer + te +
upgrade + reqHdr_via + xFrwdFor + xReqWith ));
} catch (JSONException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
驾驶员等级:
public class JsonDriver {
public static void main(String[] args) throws IOException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJobName("Json Parser");
job.setJarByClass(com.json.driver.JsonDriver.class);
job.setMapperClass(JsonMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
}
}
当我尝试使用命令提交jar文件时
[cloudera@quickstart ~]$ hadoop jar JsonMapper.jar myjson jsonoutput
我没有看到任何错误或任何类型的输出或任何消息。我看到的只是下一个命令行
[cloudera@quickstart ~]$
我有hdfs的输入。如果我得到任何错误,我可以尝试修复它,但是在提交jar文件后,我只看到下一个命令行。谁能告诉我我在这里犯了什么错误吗?
1条答案
按热度按时间gab6jxml1#
你实际上没有提交/启动驱动程序中的作业。您需要在驱动程序末尾添加以下内容:
目前,它只会在驱动程序中运行main,而不会提交作业。