我正在进行一个更大的项目,其中该函数是从with Column中调用的。它有几个不同的操作,但下面是一个用法示例:
case class Employee(id: Int, name: String)
val df = Seq(new Employee(1, "Elia"), new Employee(2, null), new Employee(3, "Fang")).toDF
df.show
+---+----+
| id|name|
+---+----+
| 1|Elia|
| 2|null|
| 3|Fang|
+---+----+
def test1: Column = {
concat(col("id"), col("name"))
}
df.withColumn("concat", test1).show
+---+----+------+
| id|name|concat|
+---+----+------+
| 1|Elia| 1Elia|
| 2|null| null|
| 3|Fang| 3Fang|
+---+----+------+
所以我想要做的是,如果其中一列有任何空值,则抛出一个异常。Test1函数中的内容如下所示:
if(col("id").isNull.sum > 0){
throw IllegalArgumentException("id can not have any nulls")
}
但很明显,专栏不能相加。我也尝试了sum(ol(“id”).isNull),但同样无效。我在stackoverflow上看到的所有示例都与使用df级函数有关,例如df.filter(“id is null”).count>0。但在我使用的框架中,这将需要相当大的重构,以便执行简单的QC检查以抛出更准确的异常。我正在修改的函数的作用域没有访问 Dataframe 的权限。我想做的事有可能实现吗?
谢谢!
2条答案
按热度按时间mwg9r5ms1#
您可以为这种情况定义UDF(用户定义函数)。看一下这个例子:
wyyhbhjk2#
根据你使用的Spark版本,你有几个选择。
如果Spark版本低于3.1,您可以这样使用UDF:
对于Spark版本>=3.1,您还可以选择使用内置函数
raise_error
(文档)如下所示: