ApacheKafka—无法动态地将json记录的模式传递给spark结构化流式记录

li9yvcax  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(266)

我有一个spark kakfa结构流媒体管道。听一个主题,它可能有不同模式的json记录。现在,我想基于键(x\u y)解析模式,然后应用到值部分来解析json记录。所以这里键的“y”部分说明了模式类型。我试图从udf中获取模式字符串,然后将其传递给from_json()函数。但它例外地失败了

org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of `schema`

使用的代码:

df.withColumn("data_type", element_at(split(col("key").cast("string"),"_"),1))
.withColumn("schema", schemaUdf($"data_type"))
.select(from_json(col("value").cast("string"), col("schema")).as("data"))

架构演示:

{
  "type" : "struct",
  "fields" : [ {
    "name" : "name",
    "type" : {
      "type" : "struct",
      "fields" : [ {
        "name" : "firstname",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }]
    },
    "nullable" : true,
    "metadata" : { }
  } ]
}

使用的自定义项:

lazy val fetchSchema = (fileName : String) => {
    DataType.fromJson(mapper.readTree(new File(fileName)).toString)
  }
val schemaUdf = udf[DataType, String](fetchSchema)

注意:我没有使用合流特性。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题