在avro中将rdd保存到s3时,控制台中出现以下警告:
使用标准fileoutputcommitter提交工作。这是缓慢和潜在的不安全。
我还没有找到一个简单的隐式 saveAsAvroFile
因此,我四处寻找并得出以下结论:
import org.apache.avro.Schema
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.{AvroJob, AvroKeyOutputFormat}
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
object AvroUtil {
def write[T](
path: String,
schema: Schema,
avroRdd: RDD[T],
job: Job = Job.getInstance()): Unit = {
val intermediateRdd = avroRdd.mapPartitions(
f = (iter: Iterator[T]) => iter.map(new AvroKey(_) -> NullWritable.get()),
preservesPartitioning = true
)
job.getConfiguration.set("avro.output.codec", "snappy")
job.getConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
AvroJob.setOutputKeySchema(job, schema)
intermediateRdd.saveAsNewAPIHadoopFile(
path,
classOf[AvroKey[T]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[T]],
job.getConfiguration
)
}
}
我相当困惑,因为我看不出什么是不正确的,因为avro文件似乎输出正确。
1条答案
按热度按时间ndasle7k1#
您可以通过实现自己的outputfilecommitter来覆盖现有fileoutputcommitter的行为,从而使其更加高效和安全。
遵循这个链接,作者已经解释了类似的例子。