在spark中编写avro文件时应该使用哪个fileoutputcommitter?

23c0lvtd  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(347)

在avro中将rdd保存到s3时,控制台中出现以下警告:
使用标准fileoutputcommitter提交工作。这是缓慢和潜在的不安全。
我还没有找到一个简单的隐式 saveAsAvroFile 因此,我四处寻找并得出以下结论:

import org.apache.avro.Schema
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.{AvroJob, AvroKeyOutputFormat}
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD

object AvroUtil {

  def write[T](
      path: String,
      schema: Schema,
      avroRdd: RDD[T],
      job: Job = Job.getInstance()): Unit = {
    val intermediateRdd = avroRdd.mapPartitions(
      f = (iter: Iterator[T]) => iter.map(new AvroKey(_) -> NullWritable.get()),
      preservesPartitioning = true
    )

    job.getConfiguration.set("avro.output.codec", "snappy")
    job.getConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")

    AvroJob.setOutputKeySchema(job, schema)

    intermediateRdd.saveAsNewAPIHadoopFile(
      path,
      classOf[AvroKey[T]],
      classOf[NullWritable],
      classOf[AvroKeyOutputFormat[T]],
      job.getConfiguration
    )
  }
}

我相当困惑,因为我看不出什么是不正确的,因为avro文件似乎输出正确。

ndasle7k

ndasle7k1#

您可以通过实现自己的outputfilecommitter来覆盖现有fileoutputcommitter的行为,从而使其更加高效和安全。
遵循这个链接,作者已经解释了类似的例子。

相关问题