Scala spark:错误在写avro文件与复杂的UNION类型

9jyewag0  于 2023-10-18  发布在  Scala
关注(0)|答案(1)|浏览(116)

我需要输出一个Sparkrdd到一个avro文件与模式。avro文件具有复杂的UNION类型。Spark似乎不支持复杂的联合类型。
avro schema是这样的:

{
    "name": "company","type": "record",
    "fields": [{
            "name": "identifier",
            "type": [
                {
                    "name": "uid","type": "int"
                    },{
                    "name": "personal_id_code","type": "record",
                    "fields": [
                        {"name": "code","type": "string"},
                        {"name": "year_released","type": "string"}
                    ]
                },{
                    "name": "person_name","type": "record",
                    "fields": [
                        {"name": "name","type": "string"},
                        {"name": "surname","type": "string"}
                    ]
                }
            ]
        },{
            "name": "users",
            "type": {
                "type": "array",
                "items": {
                    "name": "userdata",
                    "type": "record",
                    "fields": [{"name": "uid","type": "int"},
                        {"name": "name","type": "string"},
                        {"name": "zip","type": "int"},
                        {"name": "timestamp","type": "long"},
                        {"name": "properties","type": "int"}
                    ]
                }
            }
        }
    ]
}

rdd数据类型是围绕公司类型构建的:

case class personal_id_code(code: String, year_released: String)
case class person_name(name: String, surname: String)
case class identifier(uid: Int, personal_id_code: personal_id_code, name: person_name)
case class company(identifier: identifier, users: List[userdata])
case class userdata(uid: Int, name: String, zip: Int, timestamp: Long, properties: Int)

我尝试了一个简单的数据集,使用spark的avro writer:

val finalDs = List(
  company(identifier(12345678, null, null), List(userdata(123, "John", 123, 789L, 432), userdata(234, "Paul", 234, 890L, 543)))
).rdd.toDS()

finalDs.show()
finalDs
  .write.mode(SaveMode.Overwrite)
  .option("avroSchema", userSchema)
  .format("avro").save(path)

但这失败了,但有以下例外:

Unsupported Avro UNION type ["int",{"type":"record","name":"personal_id_code","fields":[{"name":"code","type":"string"},{"name":"year_released","type":"string"}]},{"type":"record","name":"person_name","fields":[{"name":"name","type":"string"},{"name":"surname","type":"string"}]}]: Only UNION of a null type and a non-null type is supported

后来我发现,Spark不支持复杂的阿弗罗工会。
我还尝试了Hadoop API

def help(rdd: RDD[company], avroSchema: Schema)(implicit spark: SparkSession): Unit = {
    val job = new Job(spark.sparkContext.hadoopConfiguration)
    AvroJob.setOutputKeySchema(job, avroSchema)

    FileUtils.deleteDirectory(new File("C:\\Temp\\avrooutput"))

    val output = s"C:/Temp/avrooutput"

    rdd.coalesce(64).map(comp => (new AvroKey(comp), NullWritable.get()))
      .saveAsNewAPIHadoopFile(
        output,
        classOf[AvroKey[company]],
        classOf[NullWritable],
        classOf[AvroKeyOutputFormat[(company, NullWritable)]],
        job.getConfiguration)
}

val sqlSchemaFromAvro = new Schema.Parser().parse(Schemas.userSchema)
val myRdd = List(
  company(identifier(12345678, null, null), List(userdata(123, "John", 123, 789L, 432), userdata(234, "Paul", 234, 890L, 543)))
).rdd

help(myRdd, sqlSchemaFromAvro)

但这失败了,但有以下例外:

Not in union ["int",{"type":"record","name":"personal_id_code","fields":[{"name":"code","type":"string"},{"name":"year_released","type":"string"}]},{"type":"record","name":"person_name","fields":[{"name":"name","type":"string"},{"name":"surname","type":"string"}]}]: identifier(12345678,null,null)

有没有一种方法可以通过spark将rdd/DS导出到avro?我应该考虑另一种方法吗?

b5buobof

b5buobof1#

最后我不得不依靠avro标准库:
我用avrotools生成了java对象:

java -jar /path/to/avro-tools-1.11.1.jar compile schema companies.avsc

然后我使用这段代码生成了正确的avro文件,给出了一个具有正确结构的RDD:
注意:avro union由java Object表示

val sqlSchemaFromAvro = new Schema.Parser().parse(Schemas.avroSchema)
val companies = rdd.collect()
val companyDatumWriter = new SpecificDatumWriter[company]()
val dataFileWriter = new DataFileWriter[company](companyDatumWriter)
val outputFile = File.createTempFile(s"${companyGroup}_tempfile_${currentTimeInMillis}", ".tmp")
dataFileWriter.create(sqlSchemaFromAvro, outputFile);
companies.foreach(dataFileWriter.append)
dataFileWriter.close()

实际上,您可以直接从生成的实体中获取模式,但我更喜欢这种方式。
表演是OK的。

相关问题