我需要输出一个Sparkrdd到一个avro文件与模式。avro文件具有复杂的UNION类型。Spark似乎不支持复杂的联合类型。
avro schema是这样的:
{
"name": "company","type": "record",
"fields": [{
"name": "identifier",
"type": [
{
"name": "uid","type": "int"
},{
"name": "personal_id_code","type": "record",
"fields": [
{"name": "code","type": "string"},
{"name": "year_released","type": "string"}
]
},{
"name": "person_name","type": "record",
"fields": [
{"name": "name","type": "string"},
{"name": "surname","type": "string"}
]
}
]
},{
"name": "users",
"type": {
"type": "array",
"items": {
"name": "userdata",
"type": "record",
"fields": [{"name": "uid","type": "int"},
{"name": "name","type": "string"},
{"name": "zip","type": "int"},
{"name": "timestamp","type": "long"},
{"name": "properties","type": "int"}
]
}
}
}
]
}
rdd数据类型是围绕公司类型构建的:
case class personal_id_code(code: String, year_released: String)
case class person_name(name: String, surname: String)
case class identifier(uid: Int, personal_id_code: personal_id_code, name: person_name)
case class company(identifier: identifier, users: List[userdata])
case class userdata(uid: Int, name: String, zip: Int, timestamp: Long, properties: Int)
我尝试了一个简单的数据集,使用spark的avro writer:
val finalDs = List(
company(identifier(12345678, null, null), List(userdata(123, "John", 123, 789L, 432), userdata(234, "Paul", 234, 890L, 543)))
).rdd.toDS()
finalDs.show()
finalDs
.write.mode(SaveMode.Overwrite)
.option("avroSchema", userSchema)
.format("avro").save(path)
但这失败了,但有以下例外:
Unsupported Avro UNION type ["int",{"type":"record","name":"personal_id_code","fields":[{"name":"code","type":"string"},{"name":"year_released","type":"string"}]},{"type":"record","name":"person_name","fields":[{"name":"name","type":"string"},{"name":"surname","type":"string"}]}]: Only UNION of a null type and a non-null type is supported
后来我发现,Spark不支持复杂的阿弗罗工会。
我还尝试了Hadoop API
def help(rdd: RDD[company], avroSchema: Schema)(implicit spark: SparkSession): Unit = {
val job = new Job(spark.sparkContext.hadoopConfiguration)
AvroJob.setOutputKeySchema(job, avroSchema)
FileUtils.deleteDirectory(new File("C:\\Temp\\avrooutput"))
val output = s"C:/Temp/avrooutput"
rdd.coalesce(64).map(comp => (new AvroKey(comp), NullWritable.get()))
.saveAsNewAPIHadoopFile(
output,
classOf[AvroKey[company]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[(company, NullWritable)]],
job.getConfiguration)
}
val sqlSchemaFromAvro = new Schema.Parser().parse(Schemas.userSchema)
val myRdd = List(
company(identifier(12345678, null, null), List(userdata(123, "John", 123, 789L, 432), userdata(234, "Paul", 234, 890L, 543)))
).rdd
help(myRdd, sqlSchemaFromAvro)
但这失败了,但有以下例外:
Not in union ["int",{"type":"record","name":"personal_id_code","fields":[{"name":"code","type":"string"},{"name":"year_released","type":"string"}]},{"type":"record","name":"person_name","fields":[{"name":"name","type":"string"},{"name":"surname","type":"string"}]}]: identifier(12345678,null,null)
有没有一种方法可以通过spark将rdd/DS导出到avro?我应该考虑另一种方法吗?
1条答案
按热度按时间b5buobof1#
最后我不得不依靠avro标准库:
我用avrotools生成了java对象:
然后我使用这段代码生成了正确的avro文件,给出了一个具有正确结构的RDD:
注意:avro union由java
Object
表示实际上,您可以直接从生成的实体中获取模式,但我更喜欢这种方式。
表演是OK的。