spark scala Kafka avro扩展器[复制]

xpcnnkqh  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(107)

此问题已在此处有答案

Convert a JSON string to a struct column without schema in Spark(4个答案)
上个月关门了。
我在吃一个来自Kafka的流媒体框架。值列中的数据在avro中。我想把数据转换成结构类型。
目前我已经写了一个udf函数,它调用KafkaAvroDeserializer中的方法public Object deserialize(String topic, byte[] bytes)。我在流框架的值列上调用这个udf函数。udf函数成功返回json字符串。
我希望在value列中得到struct类型,而不是json字符串。例如,现在我得到{"name": "ABC", "age": 17},而不是我想要的值列是结构类型,这样我就可以写value.namevalue.age
问题是,我没有一个case类或一个示例json或一个模式文件来将json字符串转换为结构类型。相同的代码可以用于具有不同数据的不同主题。
任何指导都将非常有帮助。请让我知道,如果我是在错误的方式思考或做在错误的方式量化。
udf函数代码

val kafkaAvroDeserializer = new KafkaAvroDeserializer()
  private val kafkaAvroDeserializerConfig: Map[String, Any] = Map(
    "schema.registry.url" -> //url for schema registry
  )
  kafkaAvroDeserializer.configure(kafkaAvroDeserializerConfig.asJava, false)

  val deserializationFunction: UserDefinedFunction = udf((input: Array[Byte]) => {
      val genericRecord = kafkaAvroDeserializer.deserialize("topic", input)
        .asInstanceOf[GenericRecord]
      genericRecord.toString // currently returns json string successfully
 }

使用udf方法(Data是一个简单的case类,key:字符串和值:字符串)

streamingDF.selectExpr("key", "value")
        .as[Data]
        .select(col("key"), deserializationFunction(col("value")).as("value"))

这不是一个重复的问题。我得到了一个提示,如果其他问题有助于回答我的问题。我点击了是,因为它给了我更多的洞察力。我不知道它会关闭我的问题并将其标记为重复。我在之前的评论中也有同样的回答。我已经通过了link。提供的解决方案几乎如预期的那样工作。有几个小问题。第一个解决方案是在我的ide中显示deprecated。第二种解决方案不包括其他列,如key,topic,offset。这两种解决方案都不能在流媒体框架上工作,除非我使用foreachbatch。老实说,我期待一个围绕udf函数的解决方案。

f0brbegy

f0brbegy1#

你现在有一个JSON字符串,所以Avro并不真正相关。
您可以将get_json_object用于JsonPath表达式。
否则,此处显示的大量选项Integrating Spark Structured Streaming with the Confluent Schema Registry

相关问题