apachespark-3之后如何定义userdefinedaggregatefunction?

tpxzln5u  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(230)

我使用的是spark 3.0,为了将用户定义的函数用作窗口函数,我需要一个 UserDefinedAggregateFunction . 起初我认为使用新的 Aggregator 以及 udaf 可以解决这个问题(如图所示),但是 udaf 返回一个 UserDefinedFunction ,不是 UserDefinedAggregateFunction .
从spark 3.0开始, UserDefinedAggregateFunction 如本文所述,已弃用(尽管仍有可能找到它)
所以问题是:spark 3.0中有没有一种正确的(不是不推荐的)方法来定义一个合适的 UserDefinedAggregateFunction 并将其用作窗口函数?

j13ufse2

j13ufse21#

在spark3中,新的api使用 Aggregator 要定义用户定义的聚合,请执行以下操作:
抽象类 Aggregator[-IN, BUF, OUT] 扩展可序列化:
用于用户定义聚合的基类,可在数据集操作中使用,以获取组的所有元素并将其缩减为单个值。
与不推荐使用的udaf相比,aggregator带来了性能改进。您可以看到高效用户定义聚合器的问题。
下面是一个如何定义平均聚合器并使用 functions.udaf 方法:

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator

val meanAgg= new Aggregator[Long, (Long, Long), Double]() {

    def zero = (0L, 0L) // Init the buffer

    def reduce(y: (Long, Long), x: Long) = (y._1 + x, y._2 + 1)

    def merge(a: (Long, Long), b: (Long, Long)) = (a._1 + b._1, a._2 + b._2)

    def finish(r: (Long, Long)) = r._1.toDouble / r._2

    def bufferEncoder: Encoder[(Long, Long)] = implicitly(ExpressionEncoder[(Long, Long)])

    def outputEncoder: Encoder[Double] = implicitly(ExpressionEncoder[Double])
}

val meanUdaf = udaf(meanAgg)

与窗口一起使用:

val df = Seq(
  (1, 2), (1, 5),
  (2, 3), (2, 1),
).toDF("id", "value")

df.withColumn("mean", meanUdaf($"value").over(Window.partitionBy($"id"))).show
//+---+-----+----+
//| id|value|mean|
//+---+-----+----+
//|  1|    2| 3.5|
//|  1|    5| 3.5|
//|  2|    3| 2.0|
//|  2|    1| 2.0|
//+---+-----+----+

相关问题