hadoop流与javaMap器/还原器

ebdffaop  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(487)

我正试图在一些wikipedia转储文件(以压缩的bz2形式)上运行一个带有javaMap器/reducer的hadoop流作业。我正在尝试使用wikihadoop,这是wikimedia最近发布的一个接口。
wikireader\u mapper.java

  1. package courseproj.example;
  2. // Mapper: emits (token, 1) for every article occurrence.
  3. public class WikiReader_Mapper extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> {
  4. // Reuse objects to save overhead of object creation.
  5. private final static Text KEY = new Text();
  6. private final static IntWritable VALUE = new IntWritable(1);
  7. @Override
  8. public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
  9. throws IOException {
  10. KEY.set("article count");
  11. collector.collect(KEY, VALUE);
  12. }
  13. }

维基阅读器.java

  1. package courseproj.example;
  2. //Reducer: sums up all the counts.
  3. public class WikiReader_Reducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
  4. private final static IntWritable SUM = new IntWritable();
  5. public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector,
  6. Reporter reporter) throws IOException {
  7. int sum = 0;
  8. while (values.hasNext()) {
  9. sum += values.next().get();
  10. }
  11. SUM.set(sum);
  12. collector.collect(key, SUM);
  13. }
  14. }

我执行的命令是

  1. hadoop jar lib/hadoop-streaming-2.0.0-cdh4.2.0.jar \
  2. -libjars lib2/wikihadoop-0.2.jar \
  3. -D mapreduce.input.fileinputformat.split.minsize=300000000 \
  4. -D mapreduce.task.timeout=6000000 \
  5. -D org.wikimedia.wikihadoop.previousRevision=false \
  6. -input enwiki-latest-pages-articles10.xml-p000925001p001325000.bz2 \
  7. -output out \
  8. -inputformat org.wikimedia.wikihadoop.StreamWikiDumpInputFormat \
  9. -mapper WikiReader_Mapper \
  10. -reducer WikiReader_Reducer

我收到的错误信息是

  1. Error: java.lang.RuntimeException: Error in configuring object
  2. at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
  3. at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:72)
  4. at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
  5. at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:424)
  6. at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
  7. at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:157)
  8. at java.security.AccessController.doPrivileged(Native Method)
  9. at javax.security.auth.Subject.doAs(Subject.java:396)
  10. at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
  11. at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:152)
  12. Caused by: java.lang.reflect.InvocationTargetException
  13. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  14. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  15. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  16. at java.lang.reflect.Method.invoke(Method.java:597)
  17. at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:103)
  18. Caused by: java.io.IOException: Cannot run program "WikiReader_Mapper": java.io.IOException: error=2, No such file or directory
  19. at java.lang.ProcessBuilder.start(ProcessBuilder.java:460)
  20. at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:209)

我更熟悉新的hadoopapi和旧的。因为我的mapper和reducer代码在两个不同的文件中,所以我在哪里定义作业的jobconf配置参数,同时遵循hadoop streaming的命令结构(显式设置mapper和reducer类)。有没有一种方法可以将mapper和reducer代码打包到一个类中(扩展configured和implements工具,这是在新的api中完成的),并将类名传递给hadoop流式处理命令行,而不是分别设置map和reduce类?

zi8p0yeb

zi8p0yeb1#

流媒体使用旧的api( org.apache.hadoop.mapred )-但是Map器和还原器类扩展了新的api类( org.apache.hadoop.mapreduce ).
尝试更改Map器以实现 org.apache.hadoop.mapred.Mapper ,以及要实现的减速器 org.apache.hadoop.mapred.Reducer ,例如:

  1. package courseproj.example;
  2. // Mapper: emits ("article", 1) for every article occurrence.
  3. public class WikiReader_Mapper implements Mapper<Text, Text, Text, IntWritable> {
  4. // Reuse objects to save overhead of object creation.
  5. private final static Text KEY = new Text();
  6. private final static IntWritable VALUE = new IntWritable(1);
  7. @Override
  8. public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
  9. throws IOException, InterruptedException {
  10. KEY.set("article count");
  11. collector.collect(KEY, VALUE);
  12. }
  13. }
展开查看全部

相关问题