此问题已在此处有答案:
Convert a JSON string to a struct column without schema in Spark(4个答案)
上个月关门了。
我在吃一个来自Kafka的流媒体框架。值列中的数据在avro中。我想把数据转换成结构类型。
目前我已经写了一个udf函数,它调用KafkaAvroDeserializer
中的方法public Object deserialize(String topic, byte[] bytes)
。我在流框架的值列上调用这个udf函数。udf函数成功返回json字符串。
我希望在value列中得到struct类型,而不是json字符串。例如,现在我得到{"name": "ABC", "age": 17}
,而不是我想要的值列是结构类型,这样我就可以写value.name
或value.age
。
问题是,我没有一个case类或一个示例json或一个模式文件来将json字符串转换为结构类型。相同的代码可以用于具有不同数据的不同主题。
任何指导都将非常有帮助。请让我知道,如果我是在错误的方式思考或做在错误的方式量化。
udf函数代码
val kafkaAvroDeserializer = new KafkaAvroDeserializer()
private val kafkaAvroDeserializerConfig: Map[String, Any] = Map(
"schema.registry.url" -> //url for schema registry
)
kafkaAvroDeserializer.configure(kafkaAvroDeserializerConfig.asJava, false)
val deserializationFunction: UserDefinedFunction = udf((input: Array[Byte]) => {
val genericRecord = kafkaAvroDeserializer.deserialize("topic", input)
.asInstanceOf[GenericRecord]
genericRecord.toString // currently returns json string successfully
}
使用udf方法(Data是一个简单的case类,key:字符串和值:字符串)
streamingDF.selectExpr("key", "value")
.as[Data]
.select(col("key"), deserializationFunction(col("value")).as("value"))
这不是一个重复的问题。我得到了一个提示,如果其他问题有助于回答我的问题。我点击了是,因为它给了我更多的洞察力。我不知道它会关闭我的问题并将其标记为重复。我在之前的评论中也有同样的回答。我已经通过了link。提供的解决方案几乎如预期的那样工作。有几个小问题。第一个解决方案是在我的ide中显示deprecated。第二种解决方案不包括其他列,如key,topic,offset。这两种解决方案都不能在流媒体框架上工作,除非我使用foreachbatch。老实说,我期待一个围绕udf函数的解决方案。
1条答案
按热度按时间f0brbegy1#
你现在有一个JSON字符串,所以Avro并不真正相关。
您可以将
get_json_object
用于JsonPath表达式。否则,此处显示的大量选项Integrating Spark Structured Streaming with the Confluent Schema Registry