apachespark-向自定义聚合器构造函数传递参数是否有问题?

8tntrjer  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(258)

我创造了一个 Aggregator 用作 udaf ,它在一个Dataframe中使用三列来计算结果,但它还需要其他两个参数,这两个参数是每行通用的。最初,我这样定义输入类型(简化不必要的细节)

case class In(a: Long, b: Double, c: Double, d: Long, e: Double)
class MyUDAF extends Aggregator[In, Buf, Long] {
   ...
}

并使用 litorg.apache.spark.sql.functions :

val myudaf = udaf(new MyUDAF, ExpressionEncoder[In])
val df: DataFrame = _ // suppose there's an actual DataFrame here
df.withColumn("result", myudaf(col("a"), col("b"), col("c"), lit(100L), lit(10.0)))

它工作得很好,但我不喜欢这种将这两个参数作为列传递的方法,因为我必须将它们保存在缩减缓冲区(reduction buffers)中 merge 方法仅将缓冲区作为参数)。所以我决定把它们包括在 MyUDAF 构造函数,并按如下方式使用:

case class In(a: Long, b: Double, c: Double)
class MyUDAF(d: Long, e: Double) extends Aggregator[In, Buf, Long] {
   ...
}
val myudaf = udaf(new MyUDAF(100L, 10.0), ExpressionEncoder[In])
val df: DataFrame = _
df.withColumn("result", myudaf(col("a"), col("b"), col("c")))

这在本地测试中也非常有效。但我对spark还不熟悉,所以我不知道这种做法是否会带来可能的错误。不幸的是,我目前无法访问更多的机器来创建集群并检查自己是否在更复杂的场景中出错。所以问题是:使用数据的行为是否与输入中包含的数据不同 Row s和缓冲区(如构造函数中的值)是否会导致任何问题、错误或副作用?我的第二种方法行吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题