我试图读写Parquet文件作为rdd使用Spark。我不能在我当前的应用程序中使用sparksql上下文(它需要structtype中的parquet模式,当我从avro模式转换时,在少数情况下会给我castexception)
因此,如果我尝试通过重载avroparquetformat并将parquetinputformat发送到hadoop来实现和保存parquet文件,以以下方式编写:
def saveAsParquetFile[T <: IndexedRecord](records: RDD[T], path: String)(implicit m: ClassTag[T]) = {
val keyedRecords: RDD[(Void, T)] = records.map(record => (null, record))
spark.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false)
val job = Job.getInstance(spark.hadoopConfiguration)
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
AvroParquetOutputFormat.setSchema(job, m.runtimeClass.newInstance().asInstanceOf[IndexedRecord].getSchema())
keyedRecords.saveAsNewAPIHadoopFile(
path,
classOf[Void],
m.runtimeClass.asInstanceOf[Class[T]],
classOf[ParquetOutputFormat[T]],
job.getConfiguration
)
}
这是一个错误:
Exception in thread "main" java.lang.InstantiationException: org.apache.avro.generic.GenericRecord
我按如下方式调用函数:
val file1: RDD[GenericRecord] = sc.parquetFile[GenericRecord]("/home/abc.parquet")
sc.saveAsParquetFile(file1, "/home/abc/")
暂无答案!
目前还没有任何答案,快来回答吧!