在mapreduce hadoop中将参数传递给记录读取器

yvfmudvl  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(493)

这是我使用variours arg的代码

  1. import java.io.File;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FSDataInputStream;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.InputSplit;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.RecordReader;
  12. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  17. import org.apache.poi.hwpf.HWPFDocument;
  18. import org.apache.poi.hwpf.extractor.WordExtractor;
  19. public class Docsparser {
  20. private static String Delimiter;
  21. public static class DocsInputFormat extends FileInputFormat<Text, Text> {
  22. @Override
  23. public RecordReader<Text, Text> createRecordReader(InputSplit split,
  24. TaskAttemptContext context) throws IOException, InterruptedException {
  25. return new DocsLineRecordReader();
  26. }
  27. }
  28. public static class DocsLineRecordReader extends RecordReader<Text, Text> {
  29. private Text key = new Text();
  30. private Text value = new Text();
  31. private int currentword = 0;
  32. private String fileline;
  33. private File file = null;
  34. private String line;
  35. private HWPFDocument document;
  36. private WordExtractor extractor = null;
  37. private String[] filedata;
  38. StringBuilder sb = new StringBuilder();
  39. @Override
  40. public void initialize(InputSplit split, TaskAttemptContext context)
  41. throws IOException, InterruptedException {
  42. FileSplit fileSplit = (FileSplit) split;
  43. final Path file = fileSplit.getPath();
  44. Configuration conf = context.getConfiguration();
  45. FileSystem fs = file.getFileSystem(conf);
  46. FSDataInputStream filein = fs.open(fileSplit.getPath());
  47. String Delim = conf.get("Delim");
  48. if (filein != null)
  49. {
  50. HWPFDocument document = new HWPFDocument(filein);
  51. extractor = new WordExtractor(document);
  52. fileline = extractor.getText();
  53. filedata = fileline.split(Delim);
  54. }
  55. }
  56. @Override
  57. public boolean nextKeyValue() throws IOException, InterruptedException
  58. {
  59. if (key == null) {
  60. key = new Text();
  61. }
  62. if (value == null) {
  63. value = new Text();
  64. }
  65. if(currentword < filedata.length)
  66. {
  67. for ( currentword=0;currentword < filedata.length; currentword++)
  68. {
  69. sb.append(filedata[currentword] +",");
  70. line = sb.toString();
  71. }
  72. key.set(line);
  73. value.set("");
  74. return true;
  75. }
  76. else
  77. {
  78. key = null;
  79. value = null;
  80. return false;
  81. }
  82. }
  83. @Override
  84. public Text getCurrentKey() throws IOException, InterruptedException {
  85. return key;
  86. }
  87. @Override
  88. public Text getCurrentValue() throws IOException, InterruptedException {
  89. return value;
  90. }
  91. @Override
  92. public float getProgress() throws IOException, InterruptedException {
  93. return (100.0f / filedata.length * currentword) / 100.0f;
  94. }
  95. @Override
  96. public void close() throws IOException {
  97. }
  98. }
  99. public static class Map extends Mapper<Text, Text, Text, Text>{
  100. public void map(Text key, Text value, Context context) throws IOException, InterruptedException
  101. {
  102. context.write(key,value);
  103. }
  104. }
  105. public static void main(String[] args) throws Exception
  106. {
  107. Configuration conf = new Configuration();
  108. Job job = new Job(conf, "Docsparser");
  109. job.setJarByClass(Docsparser.class);
  110. job.setOutputKeyClass(Text.class);
  111. job.setOutputValueClass(Text.class);
  112. job.setMapperClass(Map.class);
  113. job.setNumReduceTasks(0);
  114. FileInputFormat.setInputPaths(job, new Path(args[0]));
  115. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  116. Delimiter = args[2].toString();
  117. conf.set("Delim",Delimiter);
  118. job.setInputFormatClass(DocsInputFormat.class);
  119. job.setOutputFormatClass(TextOutputFormat.class);
  120. System.exit(job.waitForCompletion(true) ? 0 : 1);
  121. }
  122. }

异常详细信息:
15/09/28 03:50:04 info mapreduce.job:任务id:尝试\u 1443193152998 \u 2319 \u m \u000000 \u 2,状态:失败错误:java.lang.nullpointerexception at java.lang.string.split(string。java:2272)在java.lang.string.split(string。java:2355)在com.nielsen.grfe.docsparser$docslinerecordreader.initialize(docsparser。java:66)在org.apache.hadoop.mapred.maptask$newtrackingrecordreader.initialize(maptask。java:548)在org.apache.hadoop.mapred.maptask.runnewmapper(maptask。java:786)在org.apache.hadoop.mapred.maptask.run(maptask。java:341)在org.apache.hadoop.mapred.yarnchild$2.run(yarnchild。java:163)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:415)在org.apache.hadoop.security.usergroupinformation.doas(用户组信息。java:1671)在org.apache.hadoop.mapred.yarnchild.main(yarnchild。java:158)

js81xvg6

js81xvg61#

在初始化作业类之前,必须设置所有配置变量。移动

  1. Delimiter = args[2].toString();
  2. conf.set("Delim",Delimiter);

之前

  1. Job job = new Job(conf, "Docsparser");
e5nszbig

e5nszbig2#

这个 NullPointerException 发生在 split 方法 fileline 字符串。我怀疑你还没有设定 "Delim" 配置值和变量 Delimnull .

相关问题