我正在尝试计算滑动窗口上的自定义聚合,spark不断向我抛出以下错误:
我会非常感谢任何一个好的解决方案!
上下文
我在做什么
我正在定义一个自定义聚合器,使用 org.apache.spark.sql.expressions.Aggregator
我正在尝试应用它:
case class MySum(colName: String) extends Aggregator[Row, Double, Double] {
def zero: Double = 0d
def reduce(acc: Double, row: Row): Double = acc * 0.78 + 0.21 * row.getAs[Double](colName)
def merge(acc1: Double, acc2: Double): Double = acc1 + acc2
def finish(acc: Double): Double = acc
def bufferEncoder: Encoder[Double] = Encoders.scalaDouble
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
设置
spark:3.0.0,scala:2.12.10
问题
窗口函数=>失败
val spark = SparkSession.builder.getOrCreate
import spark.implicits._
val df = Seq(
(1L, 2.569d),
(2L, 5.89d),
(3L, 4.28d),
(4L, 2.15d),
(5L, 6.43d),
(6L, 8.92d),
(7L, 5.86d),
(8L, 1.65d),
(9L, 2.28d)
).toDF("order", "price")
val win = Window.orderBy("order").rangeBetween(0, 1)
df.withColumn("new_column", MySum("price").toColumn.as("new_column").over(win))
org.apache.spark.sql.analysisexception:分组表达式序列为空,并且' b
'不是聚合函数。包裹(mysum(boundreference(), value
,assertnotnull(cast(value as double)),boundreference())over(当前行和后面1行之间的行)as c
)'在窗口函数或 Package 中' b
'in first()(或first\u value),如果你不在乎得到哪个值的话。;;
值得一提的
“简单agg”工作正常:
df.groupBy("order").agg(MySum("price").toColumn.as("new_column")).show()
# +-----+----------+
# |order|new_column|
# +-----+----------+
# | 7| 2.93|
# | 6| 4.46|
# | 9| 1.14|
# | 5| 3.215|
# | 1| 1.2845|
# | 3| 2.14|
# | 8| 0.825|
# | 2| 2.945|
# | 4| 1.075|
# +-----+----------+
暂无答案!
目前还没有任何答案,快来回答吧!