hadoop数据集mapreduce datajoin

mfuanj7w  于 2021-06-03  发布在  Hadoop
关注(0)|答案(0)|浏览(268)

代码
我试着在ActionBook中运行hadoop的datajoin示例。

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. // import org.apache.commons.logging.Log;
  5. // import org.apache.commons.logging.LogFactory;
  6. import org.apache.hadoop.mapred.FileInputFormat;
  7. import org.apache.hadoop.mapred.FileOutputFormat;
  8. import org.apache.hadoop.mapred.JobClient;
  9. import org.apache.hadoop.mapred.JobConf;
  10. import org.apache.hadoop.mapred.TextInputFormat;
  11. import org.apache.hadoop.mapred.TextOutputFormat;
  12. import org.apache.hadoop.util.GenericOptionsParser;
  13. import org.apache.hadoop.util.Tool;
  14. import org.apache.hadoop.util.ToolRunner;
  15. import org.apache.hadoop.fs.Path;
  16. import org.apache.hadoop.io.Text;
  17. import org.apache.hadoop.io.Writable;
  18. import org.apache.hadoop.conf.Configuration;
  19. import org.apache.hadoop.conf.Configured;
  20. import org.apache.hadoop.contrib.utils.join.*;
  21. public class MultiDataSetJoinMR extends Configured implements Tool
  22. {
  23. public static class MapClass extends DataJoinMapperBase
  24. {
  25. protected Text generateInputTag(String inputFile)
  26. {
  27. String datasource = inputFile.split("-")[0];
  28. return new Text(datasource);
  29. }
  30. protected Text generateGroupKey(TaggedMapOutput aRecord)
  31. {
  32. String line = ((Text) aRecord.getData()).toString();
  33. String[] tokens = line.split(",");
  34. String groupKey = tokens[0];
  35. return new Text(groupKey);
  36. }
  37. protected TaggedMapOutput generateTaggedMapOutput(Object value)
  38. {
  39. TaggedWritable retv = new TaggedWritable((Text) value);
  40. retv.setTag(this.inputTag);
  41. return retv;
  42. }
  43. }
  44. public static class Reduce extends DataJoinReducerBase
  45. {
  46. protected TaggedMapOutput combine(Object[] tags, Object[] values)
  47. {
  48. if (tags.length < 2) return null;
  49. String joinedStr = "";
  50. for (int i=0; i<values.length; i++)
  51. {
  52. if (i > 0) joinedStr += ",";
  53. TaggedWritable tw = (TaggedWritable) values[i];
  54. String line = ((Text) tw.getData()).toString();
  55. String[] tokens = line.split(",", 2);
  56. joinedStr += tokens[1];
  57. }
  58. TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
  59. retv.setTag((Text) tags[0]);
  60. return retv;
  61. }
  62. }
  63. public static class TaggedWritable extends TaggedMapOutput
  64. {
  65. private Writable data;
  66. public TaggedWritable() {
  67. this.tag = new Text();
  68. }
  69. public TaggedWritable(Writable data)
  70. {
  71. this.tag = new Text("");
  72. this.data = data;
  73. }
  74. public Writable getData()
  75. {
  76. return data;
  77. }
  78. public void write(DataOutput out) throws IOException
  79. {
  80. this.tag.write(out);
  81. this.data.write(out);
  82. }
  83. public void readFields(DataInput in) throws IOException
  84. {
  85. this.tag.readFields(in);
  86. this.data.readFields(in);
  87. }
  88. }
  89. public int run(String[] args) throws Exception
  90. {
  91. Configuration conf = getConf();
  92. JobConf job = new JobConf(conf, MultiDataSetJoinMR.class);
  93. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  94. if (otherArgs.length != 2)
  95. {
  96. System.err.println("Usage: wordcount <in> <out>");
  97. System.exit(2);
  98. }
  99. Path in = new Path(args[0]);
  100. Path out = new Path(args[1]);
  101. FileInputFormat.setInputPaths(job, in);
  102. FileOutputFormat.setOutputPath(job, out);
  103. job.setJobName("DataJoin");
  104. job.setMapperClass(MapClass.class);
  105. job.setReducerClass(Reduce.class);
  106. job.setInputFormat(TextInputFormat.class);
  107. job.setOutputFormat(TextOutputFormat.class);
  108. job.setOutputKeyClass(Text.class);
  109. job.setOutputValueClass(TaggedWritable.class);
  110. job.set("mapred.textoutputformat.separator", ",");
  111. JobClient.runJob(job);
  112. return 0;
  113. }
  114. public static void main(String[] args) throws Exception
  115. {
  116. int res = ToolRunner.run(new Configuration(),
  117. new MultiDataSetJoinMR(),
  118. args);
  119. System.exit(res);
  120. }
  121. }

运行命令
./hadoop jar multidatasetjoin.jar/home/project/dataset/home/project/out
错误
但我面临以下问题。

  1. 15 Mar, 2013 4:29:45 PM org.apache.hadoop.metrics.jvm.JvmMetrics init
  2. INFO: Initializing JVM Metrics with processName=JobTracker, sessionId=
  3. 15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.JobClient configureCommandLineOptions
  4. WARNING: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
  5. 15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.FileInputFormat listStatus
  6. INFO: Total input paths to process : 2
  7. 15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
  8. INFO: Running job: job_local_0001
  9. 15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.FileInputFormat listStatus
  10. INFO: Total input paths to process : 2
  11. 15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.MapTask runOldMapper
  12. INFO: numReduceTasks: 1
  13. 15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
  14. INFO: io.sort.mb = 100
  15. 15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
  16. INFO: data buffer = 79691776/99614720
  17. 15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
  18. INFO: record buffer = 262144/327680
  19. 15 Mar, 2013 4:29:45 PM org.apache.hadoop.mapred.LocalJobRunner$Job run
  20. WARNING: job_local_0001
  21. java.lang.RuntimeException: Error in configuring object
  22. at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
  23. at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
  24. at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
  25. at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:354)
  26. at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
  27. at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
  28. Caused by: java.lang.reflect.InvocationTargetException
  29. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  30. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  31. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  32. at java.lang.reflect.Method.invoke(Method.java:616)
  33. at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88)
  34. ... 5 more
  35. Caused by: java.lang.RuntimeException: Error in configuring object
  36. at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
  37. at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
  38. at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
  39. at org.apache.hadoop.mapred.MapRunner.configure(MapRunner.java:34)
  40. ... 10 more
  41. Caused by: java.lang.reflect.InvocationTargetException
  42. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  43. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  44. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  45. at java.lang.reflect.Method.invoke(Method.java:616)
  46. at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88)
  47. ... 13 more
  48. Caused by: java.lang.NullPointerException
  49. at MultiDataSetJoinMR$MapClass.generateInputTag(MultiDataSetJoinMR.java:31)
  50. at org.apache.hadoop.contrib.utils.join.DataJoinMapperBase.configure(DataJoinMapperBase.java:60)
  51. ... 18 more
  52. null15 Mar, 2013 4:29:46 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
  53. INFO: map 0% reduce 0%
  54. 15 Mar, 2013 4:29:46 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
  55. INFO: Job complete: job_local_0001
  56. 15 Mar, 2013 4:29:46 PM org.apache.hadoop.mapred.Counters log
  57. INFO: Counters: 0
  58. Exception in thread "main" java.io.IOException: Job failed!
  59. at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1252)
  60. at MultiDataSetJoinMR.run(MultiDataSetJoinMR.java:123)
  61. at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
  62. at MultiDataSetJoinMR.main(MultiDataSetJoinMR.java:128)
  63. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  64. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  65. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  66. at java.lang.reflect.Method.invoke(Method.java:616)
  67. at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

从日志跟踪中,我可以确定inputfile变量在下面的方法中获得空值,

  1. protected Text generateInputTag(String inputFile)
  2. {
  3. String datasource = inputFile.split("-")[0];
  4. return new Text(datasource);
  5. }

我不知道从哪里来的电话,以及如何解决它。有人能帮我吗

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题