如何使用带有窗口函数的自定义spark聚合器

n1bvdmb6  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(320)

我正在尝试计算滑动窗口上的自定义聚合,spark不断向我抛出以下错误:
我会非常感谢任何一个好的解决方案!

上下文

我在做什么

我正在定义一个自定义聚合器,使用 org.apache.spark.sql.expressions.Aggregator 我正在尝试应用它:

case class MySum(colName: String) extends Aggregator[Row, Double, Double] {

    def zero: Double = 0d
    def reduce(acc: Double, row: Row): Double = acc * 0.78 + 0.21 * row.getAs[Double](colName)

    def merge(acc1: Double, acc2: Double): Double = acc1 + acc2
    def finish(acc: Double): Double = acc

    def bufferEncoder: Encoder[Double] = Encoders.scalaDouble
    def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

设置

spark:3.0.0,scala:2.12.10

问题

窗口函数=>失败

val spark = SparkSession.builder.getOrCreate
import spark.implicits._
val df = Seq(
      (1L, 2.569d),
      (2L, 5.89d),
      (3L, 4.28d),
      (4L, 2.15d),
      (5L, 6.43d),
      (6L, 8.92d),
      (7L, 5.86d),
      (8L, 1.65d),
      (9L, 2.28d)
).toDF("order", "price")

val win = Window.orderBy("order").rangeBetween(0, 1)
df.withColumn("new_column", MySum("price").toColumn.as("new_column").over(win))

org.apache.spark.sql.analysisexception:分组表达式序列为空,并且' b '不是聚合函数。包裹(mysum(boundreference(), value ,assertnotnull(cast(value as double)),boundreference())over(当前行和后面1行之间的行)as c )'在窗口函数或 Package 中' b 'in first()(或first\u value),如果你不在乎得到哪个值的话。;;

值得一提的

“简单agg”工作正常:

df.groupBy("order").agg(MySum("price").toColumn.as("new_column")).show()

# +-----+----------+

# |order|new_column|

# +-----+----------+

# |    7|      2.93|

# |    6|      4.46|

# |    9|      1.14|

# |    5|     3.215|

# |    1|    1.2845|

# |    3|      2.14|

# |    8|     0.825|

# |    2|     2.945|

# |    4|     1.075|

# +-----+----------+

如何使自定义聚合器与窗口函数一起工作?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题