spark udaf动态输入模式处理

jrcvhitl  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(573)

我知道如何将一个具有内部结构的结构传递给udaf,然后在spark中将一个结构传递给udaf
但是我如何处理内部结构模式未知或是动态的情况,因为它是根据数据变化的。有些字段可能存在,也可能不存在,因为输入数据不符合特定的模式。假设一个数据集

root
     |-- id:string (nullable = false)
     |-- age: long (nullable = true)
     |-- cars: struct (nullable = true)
     |    |-- car1: string (nullable = true)
     |    |-- car2: string (nullable = true)
     |    |-- car3: string (nullable = true)
     |-- name: string (nullable = true)

另一个数据集没有car3

root
 |-- id:string (nullable = false)
 |-- age: long (nullable = true)
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |-- name: string (nullable = true)

如何编写接受基于输入数据更改的模式的udaf。

tjjdgumg

tjjdgumg1#

可以在初始化udaf类时动态传递架构-

val yetAnotherUdaf = new YetAnotherUdaf(schema)

    case class YetAnotherUdaf(schema:StructType) extends UserDefinedAggregateFunction {

      override def deterministic:Boolean=true
      override def dataType:DataType=schema
      override def inputSchema:StructType=schema
      override def bufferSchema:StructType=schema

      override def initialize(buffer:MutableAggregationBuffer):Unit={ ??? }
      override def update(buffer:MutableAggregationBuffer, input:Row):Unit={ ??? }
      override def merge(buffer1:MutableAggregationBuffer, buffer2:Row):Unit={???}
      override def evaluate(buffer:Row):StructType={ ??? }
   }

相关问题