我在许多输入文件上运行hadoop作业。但如果其中一个文件被破坏,整个作业就会失败。如何使作业忽略损坏的文件?也许为我写一些计数器/错误日志,但不是失败的整个工作
unguejic1#
还有另一种可能的方法。你可以用 mapred.max.map.failures.percent 配置选项。当然,这种解决这个问题的方法也可能隐藏在map阶段发生的其他一些问题。
mapred.max.map.failures.percent
dhxwm5r42#
这取决于作业失败的位置—如果某行已损坏,并且在map方法中的某个位置引发了异常,那么您应该能够用try/catch Package map方法的主体并记录错误:
protected void map(LongWritable key, Text value, Context context) { try { // parse value to a long int val = Integer.parseInt(value.toString()); // do something with key and val.. } catch (NumberFormatException nfe) { // log error and continue } }
但是如果这个错误是由inputformat的recordreader抛出的,那么您需要修改Map器 run(..) 方法-who的默认实现如下所示:
run(..)
public void run(Context context) { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); }
所以你可以修改它来捕捉 context.nextKeyValue() 调用,但是您必须小心忽略读取器抛出的任何错误—例如,ioexeption可能不会因为忽略错误而被“跳过”。如果您已经编写了自己的inputformat/recordreader,并且您有一个表示记录失败但允许您跳过并继续解析的特定异常,那么类似的操作可能会起作用:
context.nextKeyValue()
public void run(Context context) { setup(context); while (true) { try { if (!context.nextKeyValue()) { break; } else { map(context.getCurrentKey(), context.getCurrentValue(), context); } } catch (SkippableRecordException sre) { // log error } } cleanup(context); }
但是,为了重新编写,您的recordreader必须能够在出错时恢复,否则上面的代码可能会将您送入无限循环。对于您的特定情况-如果您只想在第一次失败时忽略一个文件,那么可以将run方法更新为更简单的方法:
public void run(Context context) { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } catch (Exception e) { // log error } }
最后几句警告:您需要确保引发异常的不是Map程序代码,否则您将因为错误的原因忽略文件没有经过gzip压缩的gzip压缩文件实际上会在记录读取器的初始化中失败——因此上面的内容不会捕捉到这种类型或错误(您需要编写自己的记录读取器实现)。这对于在创建记录读取器期间抛出的任何文件错误都是正确的
8gsdolmq3#
这就是故障陷阱在级联中的用途:每当操作失败并引发异常时,如果存在关联的陷阱,则会将有问题的元组保存到陷阱tap指定的资源中。这允许作业继续处理而不丢失任何数据。这基本上可以让您的工作继续,并让您稍后检查损坏的文件如果您对流定义语句中的级联比较熟悉:
new FlowDef().addTrap( String branchName, Tap trap );
故障陷阱
3条答案
按热度按时间unguejic1#
还有另一种可能的方法。你可以用
mapred.max.map.failures.percent
配置选项。当然,这种解决这个问题的方法也可能隐藏在map阶段发生的其他一些问题。dhxwm5r42#
这取决于作业失败的位置—如果某行已损坏,并且在map方法中的某个位置引发了异常,那么您应该能够用try/catch Package map方法的主体并记录错误:
但是如果这个错误是由inputformat的recordreader抛出的,那么您需要修改Map器
run(..)
方法-who的默认实现如下所示:所以你可以修改它来捕捉
context.nextKeyValue()
调用,但是您必须小心忽略读取器抛出的任何错误—例如,ioexeption可能不会因为忽略错误而被“跳过”。如果您已经编写了自己的inputformat/recordreader,并且您有一个表示记录失败但允许您跳过并继续解析的特定异常,那么类似的操作可能会起作用:
但是,为了重新编写,您的recordreader必须能够在出错时恢复,否则上面的代码可能会将您送入无限循环。
对于您的特定情况-如果您只想在第一次失败时忽略一个文件,那么可以将run方法更新为更简单的方法:
最后几句警告:
您需要确保引发异常的不是Map程序代码,否则您将因为错误的原因忽略文件
没有经过gzip压缩的gzip压缩文件实际上会在记录读取器的初始化中失败——因此上面的内容不会捕捉到这种类型或错误(您需要编写自己的记录读取器实现)。这对于在创建记录读取器期间抛出的任何文件错误都是正确的
8gsdolmq3#
这就是故障陷阱在级联中的用途:
每当操作失败并引发异常时,如果存在关联的陷阱,则会将有问题的元组保存到陷阱tap指定的资源中。这允许作业继续处理而不丢失任何数据。
这基本上可以让您的工作继续,并让您稍后检查损坏的文件
如果您对流定义语句中的级联比较熟悉:
故障陷阱