如何防止hadoop作业在损坏的输入文件上失败

wwtsj6pe  于 2021-06-04  发布在  Hadoop
关注(0)|答案(3)|浏览(352)

我在许多输入文件上运行hadoop作业。但如果其中一个文件被破坏,整个作业就会失败。
如何使作业忽略损坏的文件?也许为我写一些计数器/错误日志,但不是失败的整个工作

unguejic

unguejic1#

还有另一种可能的方法。你可以用 mapred.max.map.failures.percent 配置选项。当然,这种解决这个问题的方法也可能隐藏在map阶段发生的其他一些问题。

dhxwm5r4

dhxwm5r42#

这取决于作业失败的位置—如果某行已损坏,并且在map方法中的某个位置引发了异常,那么您应该能够用try/catch Package map方法的主体并记录错误:

protected void map(LongWritable key, Text value, Context context) {
  try {
    // parse value to a long
    int val = Integer.parseInt(value.toString());

    // do something with key and val..
  } catch (NumberFormatException nfe) {
    // log error and continue
  }
}

但是如果这个错误是由inputformat的recordreader抛出的,那么您需要修改Map器 run(..) 方法-who的默认实现如下所示:

public void run(Context context) {
  setup(context);
  while (context.nextKeyValue()) {
    map(context.getCurrentKey(), context.getCurrentValue(), context);
  }
  cleanup(context);
}

所以你可以修改它来捕捉 context.nextKeyValue() 调用,但是您必须小心忽略读取器抛出的任何错误—例如,ioexeption可能不会因为忽略错误而被“跳过”。
如果您已经编写了自己的inputformat/recordreader,并且您有一个表示记录失败但允许您跳过并继续解析的特定异常,那么类似的操作可能会起作用:

public void run(Context context) {
  setup(context);
  while (true) {
    try {
      if (!context.nextKeyValue()) { 
        break;
      } else {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } catch (SkippableRecordException sre) {
      // log error
    }

  }
  cleanup(context);
}

但是,为了重新编写,您的recordreader必须能够在出错时恢复,否则上面的代码可能会将您送入无限循环。
对于您的特定情况-如果您只想在第一次失败时忽略一个文件,那么可以将run方法更新为更简单的方法:

public void run(Context context) {
  setup(context);
  try {
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  } catch (Exception e) {
    // log error
  }
}

最后几句警告:
您需要确保引发异常的不是Map程序代码,否则您将因为错误的原因忽略文件
没有经过gzip压缩的gzip压缩文件实际上会在记录读取器的初始化中失败——因此上面的内容不会捕捉到这种类型或错误(您需要编写自己的记录读取器实现)。这对于在创建记录读取器期间抛出的任何文件错误都是正确的

8gsdolmq

8gsdolmq3#

这就是故障陷阱在级联中的用途:
每当操作失败并引发异常时,如果存在关联的陷阱,则会将有问题的元组保存到陷阱tap指定的资源中。这允许作业继续处理而不丢失任何数据。
这基本上可以让您的工作继续,并让您稍后检查损坏的文件
如果您对流定义语句中的级联比较熟悉:

new FlowDef().addTrap( String branchName, Tap trap );

故障陷阱

相关问题