我有一个spark kakfa结构流媒体管道。听一个主题,它可能有不同模式的json记录。现在,我想基于键(x\u y)解析模式,然后应用到值部分来解析json记录。所以这里键的“y”部分说明了模式类型。我试图从udf中获取模式字符串,然后将其传递给from_json()函数。但它例外地失败了
org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of `schema`
使用的代码:
df.withColumn("data_type", element_at(split(col("key").cast("string"),"_"),1))
.withColumn("schema", schemaUdf($"data_type"))
.select(from_json(col("value").cast("string"), col("schema")).as("data"))
架构演示:
{
"type" : "struct",
"fields" : [ {
"name" : "name",
"type" : {
"type" : "struct",
"fields" : [ {
"name" : "firstname",
"type" : "string",
"nullable" : true,
"metadata" : { }
}]
},
"nullable" : true,
"metadata" : { }
} ]
}
使用的自定义项:
lazy val fetchSchema = (fileName : String) => {
DataType.fromJson(mapper.readTree(new File(fileName)).toString)
}
val schemaUdf = udf[DataType, String](fetchSchema)
注意:我没有使用合流特性。
暂无答案!
目前还没有任何答案,快来回答吧!