什么是非类型化scala udf和类型化scala udf?他们有什么不同?

x6yk4ghg  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(510)

我使用Spark2.4已经有一段时间了,最近几天刚开始切换到Spark3.0。我在切换到spark 3.0运行后出现了这个错误 udf((x: Int) => x, IntegerType) :

Caused by: org.apache.spark.sql.AnalysisException: You're using untyped Scala UDF, which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-type argument, and the closure will see the default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. To get rid of this error, you could:
1. use typed Scala UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`
2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { override def call(s: String): Integer = s.length() }, IntegerType)`, if input types are all non primitive
3. set spark.sql.legacy.allowUntypedScalaUDF to true and use this API with caution;

这些解决方案是由spark自己提出的,在谷歌搜索了一段时间后,我进入了spark迁移指南页面:
在spark 3.0中,默认情况下不允许使用org.apache.spark.sql.functions.udf(anyref,datatype)。删除return type参数以自动切换到类型化的scala udf,或者将spark.sql.legacy.allowuntypedscalaudf设置为true以继续使用它。在sparkversion2.4及更低版本中,如果org.apache.spark.sql.functions.udf(anyref,datatype)获得带有基元类型参数的scala闭包,则如果输入值为null,则返回的udf将返回null。但是,在spark 3.0中,如果输入值为null,则udf将返回java类型的默认值。例如,val f=udf((x:int)=>x,integertype),f($“x”)在spark 2.4中返回null,如果x列为null,则在spark 3.0中返回0。引入这种行为更改是因为spark 3.0默认使用scala 2.12构建。
来源:spark迁移指南
我注意到我通常的使用方法 function.udf api,即 udf(AnyRef, DataType) ,称为 UnTyped Scala UDF 建议的解决方案是 udf(AnyRef) ,称为 Typed Scala UDF .
据我所知,第一个看起来比第二个更严格的类型化,其中第一个有显式定义的输出类型,而第二个没有,因此我不明白为什么称之为非类型化。
函数也被传递给了 udf ,即 (x:Int) => x ,显然已定义其输入类型,但 You're using untyped Scala UDF, which does not have the input type information ?
我的理解正确吗?即使经过更深入的搜索,我仍然找不到任何材料来解释什么是非类型化的scala-udf,什么是类型化的scala-udf。
所以我的问题是:它们是什么?他们有什么不同?

rnmwe5a2

rnmwe5a21#

在类型化scala udf中,udf知道作为参数传递的列的类型,而在非类型化scala udf中,udf不知道作为参数传递的列的类型
当创建类型化的scala udf时,作为udf的参数和输出传递的列的类型是从函数参数和输出类型推断出来的,而当创建非类型化的scala udf时,对于参数或输出,根本没有类型推断。
令人困惑的是,在创建类型化的udf时,类型是从函数中推断出来的,而不是显式地作为参数传递。更明确地说,您可以编写类型化的自定义项创建,如下所示:

val my_typed_udf = udf[Int, Int]((x: Int) => Int)

现在,让我们看看你提出的两点。
据我所知,第一个 udf(AnyRef, DataType) )看起来比第二个更严格(例如 udf(AnyRef) )其中第一个有显式定义的输出类型,而第二个没有,因此我混淆了为什么称之为非类型化。
根据spark函数scaladoc udf 将函数转换为自定义项的函数实际上是:

def udf(f: AnyRef, dataType: DataType): UserDefinedFunction

对于第二个:

def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction

所以第二个函数的类型实际上比第一个函数的类型更多,因为第二个函数考虑了作为参数传递的函数的类型,而第一个函数删除了函数的类型。
这就是为什么在第一个函数中需要定义返回类型,因为spark需要这些信息,但不能从作为参数传递的函数中推断它,因为它的返回类型被删除,而在第二个函数中,返回类型是从作为参数传递的函数中推断出来的。
函数也被传递给了 udf ,即 (x:Int) => x ,显然已定义其输入类型,但 You're using untyped Scala UDF, which does not have the input type information ?
这里重要的不是函数,而是spark如何从这个函数创建udf。
在这两种情况下,要转换为udf的函数都定义了其输入和返回类型,但在使用 udf(AnyRef, DataType) .

tez616oj

tez616oj2#

这并没有回答您最初关于不同的UDF是什么的问题,但是如果您想消除错误,在python中,您可以在脚本中包含以下行: spark.sql("set spark.sql.legacy.allowUntypedScalaUDF=true") .

相关问题