hadoop可写读字段eofexception

kxe2p93d  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(408)

我正在实现我自己的可写hadoop二次排序,但是在运行作业时,hadoop一直在我的文件中抛出eofexception readFields 我不知道怎么了。
错误堆栈跟踪:

java.lang.Exception: java.lang.RuntimeException: java.io.EOFException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:559)
Caused by: java.lang.RuntimeException: java.io.EOFException
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:165)
    at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:158)
    at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:628)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:347)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.hadoop.io.IntWritable.readFields(IntWritable.java:47)
    at writable.WikiWritable.readFields(WikiWritable.java:39)
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:158)
    ... 12 more

我的代码:

package writable;

import org.apache.hadoop.io.*;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class WikiWritable implements WritableComparable<WikiWritable> {
  private IntWritable docId;
  private IntWritable position;

  public WikiWritable() {
    this.docId = new IntWritable();
    this.position = new IntWritable();
  }

  public void set(String docId, int position) {
    this.docId = new IntWritable(Integer.valueOf(docId));
    this.position = new IntWritable(position);
  }

  @Override
  public int compareTo(WikiWritable o) {
    int result = this.docId.compareTo(o.docId);
    result = result == 0 ? this.position.compareTo(o.position) : result;
    return result;
  }

  @Override
  public void write(DataOutput dataOutput) throws IOException {
    docId.write(dataOutput);
    position.write(dataOutput); // error here
  }

  @Override
  public void readFields(DataInput dataInput) throws IOException {
    docId.readFields(dataInput);
    position.readFields(dataInput);
  }

  public IntWritable getDocId() {
    return docId;
  }

  public int getPosition() {
    return Integer.valueOf(position.toString());
  }
}

// Driver
public class Driver {
  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Path wiki = new Path(args[0]);
    Path out = new Path(args[1]);

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "myjob");

    TextInputFormat.addInputPath(job, wiki);
    TextOutputFormat.setOutputPath(job, out);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(WikiWritable.class);

    job.setJarByClass(Driver.class);
    job.setMapperClass(WordMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setReducerClass(WordReducer.class);
    job.setPartitionerClass(WikiPartitioner.class);
    job.setGroupingComparatorClass(WikiComparator.class);

    job.waitForCompletion(true);
  }
}

// Mapper.map
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String[] words = value.toString().split(",");
    String id = words[0];
    String[] contents = words[3].toLowerCase().replaceAll("[^a-z]+", " ").split("\\s+");

    for (int i = 0; i < contents.length; i++) {
      String word = contents[i].trim();
      word = stem(word);
      WikiWritable output = new WikiWritable();
      output.set(id, i);
      context.write(new Text(contents[i]), output);
    }
  }

// Comparator
public class WikiComparator extends WritableComparator {
  public WikiComparator() {
    super(WikiWritable.class, true);
  }

  @Override
  public int compare(WritableComparable wc1, WritableComparable wc2) {
    WikiWritable w1 = (WikiWritable) wc1;
    WikiWritable w2 = (WikiWritable) wc2;
    return w1.compareTo(w2);
  }
}

// Partitioner
public class WikiPartitioner extends Partitioner<WikiWritable, Text> {
  @Override
  public int getPartition(WikiWritable wikiWritable, Text text, int i) {
    return Math.abs(wikiWritable.getDocId().hashCode() % i);
  }
}

// Reducer
public class WordReducer extends Reducer<Text, WikiWritable, Text, Text> {
  @Override
  protected void reduce(Text key, Iterable<WikiWritable> values, Context ctx) throws IOException, InterruptedException {
    Map<String, StringBuilder> map = new HashMap<>();
    for (WikiWritable w : values) {
      String id = String.valueOf(w.getDocId());
      if (map.containsKey(id)) {
        map.get(id).append(w.getPosition()).append(".");
      } else {
        map.put(id, new StringBuilder());
        map.get(id).append(".").append(w.getPosition()).append(".");
      }
    }

    StringBuilder builder = new StringBuilder();
    map.keySet().forEach((k) -> {
      map.get(k).deleteCharAt(map.get(k).length() - 1);
      builder.append(k).append(map.get(k)).append(";");
    });

    ctx.write(key, new Text(builder.toString()));
  }
}

在构建新的 WikiWritable ,Map程序首先调用 new WikiWritable() 然后打电话过来 set(...) .
我试着换衣服 docId 以及 position 字符串和整数并使用 dataOutput.read() (我忘记了确切的方法名,但它是类似的东西)仍然不起作用。

fdbelqdn

fdbelqdn1#

tldr:你只需要移除你的 WikiComparator 完全,不打电话 job.setGroupingComparatorClass 完全。
说明:组比较器用于比较map输出键,而不是map输出值。Map输出键是 Text 对象和值是 WikiWritable 物体。这意味着传递给比较器进行反序列化的字节表示序列化 Text 物体。然而 WikiComparator 使用反射创建 WikiWritable 对象,然后尝试反序列化 Text 使用 WikiWritable.readFields 方法。这显然会导致错误的阅读,从而导致您看到的异常。
也就是说,我相信你根本不需要比较器,因为默认值 WritableComparator 做的和你的一模一样:呼叫 compareTo 传递给它的对象对的方法。
编辑:编辑 compareTo 被调用的方法是比较键,而不是值,所以它比较 Text 物体。如果你想比较和排序你的 WikiWritable 因此,您应该考虑将它们添加到复合键中。有很多关于复合键和二次排序的教程。

相关问题