我想用提供的avro模式而不是spark自动生成的模式来编写avro格式的Dataframe。如何告诉spark在编写时使用自定义模式?
{
"type" : "record",
"name" : "name1",
"namespace" : "com.data"
"fields" : [
{
"name" : "id",
"type" : "string"
},
{
"name" : "count",
"type" : "int"
},
{
"name" : "val_type",
"type" : {
"type" : "enum",
"name" : "ValType"
"symbols" : [ "s1", "s2" ]
}
}
]
}
使用avroschema读取avro。在这一步上一切正常。
数据集d1=spark.read().option(“avroschema”,string.valueof(inavroschema)).format(“com.databricks.spark.avro”).load(“s3\u path”);
在这里,我对上述数据执行一些spark.sql并将其存储到dataframe。
当我试图基于avro模式将avro数据写入s3时
数据类型:
root
|-- id: string (nullable = true)
|-- count: integer (nullable = true)
|-- val_type: string (nullable = true)
finaldf.write().option(“avroschema”,string.valueof(inavroschema)).format(“com.databricks.spark.avro”).mode(“overwrite”).save(“target\u s3\u path”);
我有个错误:
User class threw exception: org.apache.spark.SparkException: Job aborted.
......
Caused by: org.apache.avro.AvroRuntimeException:**Not a union: "string"**
at org.apache.avro.Schema.getTypes(Schema.java:299)
at
org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$resolveNullableType(AvroSerializer.scala:229)
有没有任何方法可以使用avro模式来编写avro数据,或者它的方法是否正确(使用 "option("avroSchema",String.valueOf(inAvroSchema))"
)-可能是我做错了什么? "forceSchema" option
对我来说不管用。
提前谢谢。
2条答案
按热度按时间abithluo1#
你可以用
org.apache.spark:spark-avro
打包并尝试设置avroSchema
上的选项to_avro
功能。这是医生:https://spark.apache.org/docs/latest/sql-data-sources-avro.html#to_avro-来自阿夫罗rt4zxlrg2#
我四处打探,发现了一些有趣的东西,
当我用spark2.4.x执行代码时,上面的代码失败了,但是当我用新的spark3.0.0运行相同的代码时,代码成功了,数据也成功地写入了。
我想最好的办法是升级spark版本或更改avro模式定义。