我正在研究pagerank算法的实现,该算法使用hadoop、mapreduce和rdf三元组作为源代码。
到目前为止,代码非常简单,主类只有一个作业,其次是mapper和reducer。输入文件是一个.nt文件,其中包含rdf三元组,例如:
<http://dbpedia.org/resource/Anarchism> <http://dbpedia.org/ontology/wikiPageWikiLink> <http://dbpedia.org/resource/Red_Army> .
mapper应该把这些三元组Map成主客体对。对于给定的rdf,它将是:
<http://dbpedia.org/resource/Anarchism> <http://dbpedia.org/resource/Red_Army>
reducer应该将这些对分组为包含主题、基本pagerank(1)和对象列表的行。例如:
<http://dbpedia.org/resource/Anarchism> 1.0 <http://dbpedia.org/resource/Red_Army>,<http://dbpedia.org/resource/Joseph_Conrad>
我在windows上使用hadoop2.3.0。显然,它的配置是正确的,因为像wordcount这样的例子对它有效(编辑)在linux下的Hadoop2.6.0上也尝试过,没有更好的效果,结果是一样的。
我用以下命令执行jar:
hadoop jar 'C:\hwork\PageRankHadoop.jar' PageRankHadoop /in /output --all
对于大约1500行长的输入文件,执行大约需要1分钟,但它会生成空输出(包括\u success sic!)。很显然,Map器工作不正常,因为在日志中我可以看到
Map-Reduce Framework
Map input records=0
Map output records=0
Map output bytes=0
我今天花了8个小时处理这段代码,但没有得到一个输出。因此,我请求你的帮助,程序员们。
我将在代码下面粘贴更多作业执行日志,可能会有所帮助。我还注意到,在执行作业的过程中,每次作业运行Map器时,hadoop namenode都会抛出
15/04/27 21:15:59 INFO ipc.Server: Socket Reader #1 for port 9000: readAndProcess from client 127.0.0.1 threw exception [java.io.IOException: An existing connection was forcibly closed by the remote host]
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(Unknown Source)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(Unknown Source)
at sun.nio.ch.IOUtil.read(Unknown Source)
at sun.nio.ch.SocketChannelImpl.read(Unknown Source)
at org.apache.hadoop.ipc.Server.channelRead(Server.java:2502)
at org.apache.hadoop.ipc.Server.access$2800(Server.java:124)
at org.apache.hadoop.ipc.Server$Connection.readAndProcess(Server.java:1410)
at org.apache.hadoop.ipc.Server$Listener.doRead(Server.java:708)
at org.apache.hadoop.ipc.Server$Listener$Reader.doRunLoop(Server.java:582)
at org.apache.hadoop.ipc.Server$Listener$Reader.run(Server.java:553)
根据一些文章,我发现它不会毁了我的Map,但它确实看起来可疑,我不知道为什么会发生。
主要类别:
public class PageRankHadoop {
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
Job job = new Job(conf, "Page Rank RDF Hadoop");
job.setJarByClass(PageRankHadoop.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(NTriplesMapper.class);
job.setReducerClass(NTriplesReducer.class);
job.setInputFormatClass(NTriplesInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
} catch (IOException | IllegalStateException | IllegalArgumentException | InterruptedException | ClassNotFoundException e) {
System.err.println("Error! " + e.getMessage());
e.printStackTrace(System.err);
}
}
}
Mapper:
public class NTriplesMapper extends Mapper<LongWritable, TripleWritable, LongWritable, Text> {
@Override
protected void map(LongWritable key, TripleWritable value, Context context) {
try {
context.write(key, new Text(value.get().getObject().getURI()));
} catch (IOException | InterruptedException ex) {
System.err.println("Mapper error: " + ex.getMessage());
ex.printStackTrace(System.err);
}
}
}
Reducer:
public class NTriplesReducer extends Reducer<LongWritable, Text, Text, Text> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) {
String pageRankList = "1.0";
for (Text value : values) {
pageRankList += "," + value.toString();
}
try {
context.write(new Text(key.toString()), new Text(pageRankList));
} catch (IOException | InterruptedException ex) {
System.err.println("Reducer error: " + ex.getMessage());
ex.printStackTrace(System.err);
}
}
}
shell作业执行日志:http://pastebin.com/uf0zh20h 来自hadoop\logs\userlogs的系统日志:http://pastebin.com/gncwdsr7
编辑,添加日志到代码中,没有抛出异常。还尝试在linux下的hadoop2.6.0上运行此代码,结果与windows下的hadoop2.3.0相同
2条答案
按热度按时间7cwmlq891#
hadoop版本不匹配,或者说jena版本是问题所在。其中一个依赖项太旧了,没有给出任何迹象,但使用最新版本修复了这个问题。
8mmmxcuj2#
你的代码中有一大堆可能的问题,我将尝试强调这些问题,但不清楚哪一个可能是问题的原因。
吞咽错误
第一个明显的问题是你的代码会吞噬错误:
这意味着您的作业抛出的任何错误都将被静默地抑制。至少应该将错误转储到控制台。
这是我要更改的第一件事,如果您随后开始看到错误消息,这将为您提供一个指向问题实际原因的指针。
类型签名
其次,你在使用
Triple
直接在您的Mapper
以及Reducer
. 这个Triple
类型是标准java对象,不能用作hadoopWritable
类型。要在hadoop上使用rdf数据,您需要使用apachejena elephas库(您似乎至少在部分代码中这样做)和
TripleWritable
因此不清楚为什么hadoop甚至允许您的代码编译/运行。文件读取问题
一个可能的问题是,您可能需要显式指定要递归搜索输入路径。根据此答案,在为作业设置输入路径之前,请尝试添加以下内容:
hadoop版本不匹配
您使用的是hadoop 2.3.0,而elephas是为2.6.0构建的-我不认为elephas使用了任何不向后兼容的API,但是如果所有其他方法都失败了,您可以根据使用其他hadoop版本的文档,尝试根据您的hadoop版本构建库