hadoop记录读取器只读取第一行,然后输入流似乎被关闭

8dtrkrch  于 2021-06-04  发布在  Hadoop
关注(0)|答案(0)|浏览(317)

我正在尝试实现一个hadoop作业,它统计对象(单击)在数据集中出现的频率。因此,我编写了一个自定义文件输入格式。记录读取器似乎只读取给定文件的第一行,并关闭输入流。
代码如下:
pojo类:

  1. package model;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.WritableComparable;
  6. public class Click implements WritableComparable<Click> {
  7. private String user;
  8. private String clickStart;
  9. private String date;
  10. private String clickTarget;
  11. @Override
  12. public void write(DataOutput out) throws IOException {
  13. out.writeUTF(user);
  14. out.writeUTF(clickStart);
  15. out.writeUTF(date);
  16. out.writeUTF(clickTarget);
  17. }
  18. @Override
  19. public void readFields(DataInput in) throws IOException {
  20. user = in.readUTF();
  21. clickStart = in.readUTF();
  22. date = in.readUTF();
  23. clickTarget = in.readUTF();
  24. }
  25. public int compareTo(Click arg0) {
  26. int response = clickTarget.compareTo(arg0.clickTarget);
  27. if (response == 0) {
  28. response = date.compareTo(arg0.date);
  29. }
  30. return response;
  31. }
  32. public String getUser(String user) {
  33. return this.user;
  34. }
  35. public void setUser(String user) {
  36. this.user = user;
  37. }
  38. public String getClickStart() {
  39. return clickStart;
  40. }
  41. public void setClickStart(String clickStart) {
  42. this.clickStart = clickStart;
  43. }
  44. public String getDate() {
  45. return date;
  46. }
  47. public void setDate(String date) {
  48. this.date = date;
  49. }
  50. public String getClickTarget() {
  51. return clickTarget;
  52. }
  53. public void setClickTarget(String clickTarget) {
  54. this.clickTarget = clickTarget;
  55. }
  56. public String toString() {
  57. return clickStart + "\t" + date;
  58. }
  59. }

以下是fileinputformat类:

  1. package ClickAnalysis;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.io.InputStreamReader;
  6. import model.Click;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.IntWritable;
  10. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  11. import org.apache.hadoop.mapreduce.InputSplit;
  12. import org.apache.hadoop.mapreduce.RecordReader;
  13. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  15. import org.apache.hadoop.util.StringUtils;
  16. import org.apache.tools.ant.types.CommandlineJava.SysProperties;
  17. public class ClickAnalysisInputFormat extends FileInputFormat<Click, IntWritable>{
  18. @Override
  19. public RecordReader<Click, IntWritable> createRecordReader(
  20. InputSplit split, TaskAttemptContext context) throws IOException,
  21. InterruptedException {
  22. System.out.println("Creating Record Reader");
  23. return new ClickReader();
  24. }
  25. public static class ClickReader extends RecordReader<Click, IntWritable> {
  26. private BufferedReader in;
  27. private Click key;
  28. private IntWritable value;
  29. @Override
  30. public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
  31. key = new Click();
  32. value = new IntWritable(1);
  33. System.out.println("Starting to read ...");
  34. FileSplit split = (FileSplit) inputSplit;
  35. Configuration conf = context.getConfiguration();
  36. Path path = split.getPath();
  37. InputStream is = path.getFileSystem(conf).open(path);
  38. in = new BufferedReader(new InputStreamReader(is));
  39. }
  40. @Override
  41. public boolean nextKeyValue() throws IOException, InterruptedException {
  42. String line = in.readLine();
  43. System.out.println("line: " + line);
  44. boolean hasNextKeyValue;
  45. if (line == null) {
  46. System.out.println("line is null");
  47. hasNextKeyValue = false;
  48. } else {
  49. String[] click = StringUtils.split(line, '\\', ';');
  50. System.out.println(click[0].toString());
  51. System.out.println(click[1].toString());
  52. System.out.println(click[2].toString());
  53. key.setClickStart(click[0].toString());
  54. key.setDate(click[1].toString());
  55. key.setClickTarget(click[2].toString());
  56. value.set(1);
  57. System.out.println("done with first line");
  58. hasNextKeyValue = true;
  59. }
  60. System.out.println(hasNextKeyValue);
  61. return hasNextKeyValue;
  62. }
  63. @Override
  64. public Click getCurrentKey() throws IOException, InterruptedException {
  65. return this.key;
  66. }
  67. @Override
  68. public IntWritable getCurrentValue() throws IOException, InterruptedException {
  69. return this.value;
  70. }
  71. @Override
  72. public float getProgress() throws IOException, InterruptedException {
  73. return 0;
  74. }
  75. public void close() throws IOException {
  76. in.close();
  77. System.out.println("in closed");
  78. }
  79. }
  80. }

Map器类:

  1. package ClickAnalysis;
  2. import java.io.IOException;
  3. import model.Click;
  4. import model.ClickStartTarget;
  5. import org.apache.hadoop.io.*;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import org.jruby.RubyProcess.Sys;
  8. public class ClickAnalysisMapper extends Mapper<Click, IntWritable, Click, IntWritable> {
  9. private static final IntWritable outputValue = new IntWritable();
  10. @Override
  11. protected void map(Click key, IntWritable value, Context context) throws IOException, InterruptedException {
  12. System.out.println("Key: " + key.getClickStart() + " " + key.getDate() + " " + key.getClickTarget() + " Value: " + value);
  13. outputValue.set(value.get());
  14. System.out.println(outputValue.get());
  15. context.write(key, outputValue);
  16. System.out.println("nach context");
  17. }
  18. }

分区器类:

  1. package ClickAnalysis;
  2. import model.Click;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. public class ClickAnalysisPartitioner extends Partitioner<Click, IntWritable> {
  6. @Override
  7. public int getPartition(Click key, IntWritable value, int numPartitions) {
  8. System.out.println("in Partitioner drinnen");
  9. int partition = numPartitions;
  10. return partition;
  11. }
  12. }

hadoop作业,通过servlet容器中的restful web服务调用触发,但这不应该是问题所在:

  1. package ClickAnalysis;
  2. import model.Click;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.*;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
  11. public class ClickAnalysisJob {
  12. public int run() throws Exception {
  13. // TODO Auto-generated method stub
  14. Configuration conf = new Configuration();
  15. Job job = Job.getInstance(conf, "ClickAnalysisJob");
  16. job.setJarByClass(ClickAnalysisJob.class);
  17. // Job Input path
  18. FileInputFormat.setInputPaths(job, "hdfs://localhost:9000/user/hadoop/testdata1.csv");
  19. // Job Output path
  20. Path out = new Path("hdfs://localhost:9000/user/hadoop/clickdataAnalysis_out");
  21. FileOutputFormat.setOutputPath(job, out);
  22. out.getFileSystem(conf).delete(out,true);
  23. job.setMapperClass(ClickAnalysisMapper.class);
  24. job.setReducerClass(Reducer.class);
  25. job.setPartitionerClass(ClickAnalysisPartitioner.class);
  26. //job.setReducerClass(ClickAnalysisReducer.class);
  27. job.setInputFormatClass(ClickAnalysisInputFormat.class);
  28. job.setOutputFormatClass(SequenceFileOutputFormat.class);
  29. job.setOutputKeyClass(Click.class);
  30. job.setOutputValueClass(IntWritable.class);
  31. job.setMapOutputKeyClass(Click.class);
  32. job.setMapOutputValueClass(IntWritable.class);
  33. System.out.println("in run drinnen");
  34. //job.setGroupingComparatorClass(ClickTargetAnalysisComparator.class);
  35. job.setNumReduceTasks(1);
  36. int result = job.waitForCompletion(true)? 0:1;
  37. return result;
  38. }
  39. }

接下来是数据集(示例):

  1. /web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
  2. /web/big-data-test-site/test-seite-1;2014-07-08;ein anderes ziel
  3. /web/big-data-test-site/test-seite-1;2014-07-08;ein anderes ziel
  4. /web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
  5. /web/big-data-test-site/test-seite-1;2014-07-08;ein drittes ziel
  6. /web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
  7. /web/big-data-test-site/test-seite-1;2014-07-08;ein viertes ziel
  8. /web/big-data-test-site/test-seite-1;2014-07-08;ein ziel

当我运行程序时,syso显示如下:

  1. in run drinnen
  2. Creating Record Reader
  3. Starting to read ...
  4. line: /web/big-data-test-site/test-seite-1;2014-07-08;ein ziel
  5. /web/big-data-test-site/test-seite-1
  6. 2014-07-08
  7. ein ziel
  8. done with first line
  9. true
  10. Key: /web/big-data-test-site/test-seite-1 2014-07-08 ein ziel Value: 1
  11. 1
  12. in closed
  13. analyze Method: 1

由此我得出结论,读录器只读第一行。为什么会发生这种情况,如何解决?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题