hadoop具有推测性执行的多个输出

3b6akqbq  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(328)

我有一个任务,将avro输出写入由输入记录的几个字段组织的多个目录中。

For example : 
Process records of countries across years 
and write in a directory structure of country/year 
eg:
outputs/usa/2015/outputs_usa_2015.avro 
outputs/uk/2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
     multipleOutputs.write("output", avroKey, NullWritable.get(), 
            OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());

下面的代码将使用什么输出提交程序来编写输出。与推测性执行一起使用是否不安全?对于推测性执行,这会导致(可能会导致)org.apache.hadoop.hdfs.server.namenode.leaseexpiredexception
在这篇hadoop reducer文章中:如何使用推测执行输出到多个目录?建议使用自定义输出提交程序
下面来自hadoop avromultipleoutputs的代码没有说明任何推测性执行的问题

private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
          String baseFileName) throws IOException, InterruptedException {

    writer =
                ((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
                    taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}

如果baseoutput路径在作业目录之外,write方法也不会记录任何问题

public void write(String namedOutput, Object key, Object value, String baseOutputPath)

在作业目录外写入时,avromultipleoutputs(另一个输出)是否存在推测性执行的实际问题?如果,那么我如何重写avromultipleoutputs来拥有它自己的输出提交程序。在avromultipleoutputs中,我看不到它使用哪个输出提交程序的outputformat

e0bqpujr

e0bqpujr1#

AvroMultipleOutputs 将使用 OutputFormat 在添加命名输出时已注册到作业配置,例如使用 addNamedOutput api来自 AvroMultipleOutputs (例如。 AvroKeyValueOutputFormat ).
AvroMultipleOutputs ,您可能无法使用推测性任务执行功能。即使覆盖它也不会有帮助,也不简单。
你应该自己写 OutputFormat (最有可能扩展一种可用的avro输出格式,例如。 AvroKeyValueOutputFormat ),并重写/实现其 getRecordWriter api,它将返回一个 RecordWriter 示例说 MainRecordWriter (仅供参考)。
这个 MainRecordWriter 会保留一张 RecordWriter (例如。 AvroKeyValueRecordWriter )示例。每一个 RecordWriter 示例将属于其中一个输出文件。在 write 的api MainRecordWriter ,你会得到实际的 RecordWriter 示例,并使用此记录编写器编写记录。所以呢 MainRecordWriter 只是在多个 RecordWriter 示例。
对于一些类似的实现,您可能希望从 piggybank 图书馆。

cvxl0en2

cvxl0en22#

将命名输出添加到 AvroMultipleOutputs ,它将调用 AvroKeyOutputFormat.getRecordWriter() 或者 AvroKeyValueOutputFormat.getRecordWriter() ,哪个电话 AvroOutputFormatBase.getAvroFileOutputStream() ,其内容为

protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
  Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(),
    getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
  return path.getFileSystem(context.getConfiguration()).create(path);
}

以及 AvroOutputFormatBase 延伸 FileOutputFormat (the) getOutputCommitter() 在上述方法中,实际上是对 FileOutputFormat.getOutputCommitter() . 因此, AvroMultipleOutputs 应具有与相同的约束 MultipleOutputs .

相关问题