Scala Spark-获取仅包含Column的列中的空值数量,而不是DF

yjghlzjz  于 2022-11-09  发布在  Scala
关注(0)|答案(2)|浏览(126)

我正在进行一个更大的项目,其中该函数是从with Column中调用的。它有几个不同的操作,但下面是一个用法示例:

case class Employee(id: Int, name: String)
val df = Seq(new Employee(1, "Elia"), new Employee(2, null), new Employee(3, "Fang")).toDF
df.show
+---+----+
| id|name|
+---+----+
|  1|Elia|
|  2|null|
|  3|Fang|
+---+----+
def test1: Column =  {
  concat(col("id"), col("name"))
}
df.withColumn("concat", test1).show
+---+----+------+
| id|name|concat|
+---+----+------+
|  1|Elia| 1Elia|
|  2|null|  null|
|  3|Fang| 3Fang|
+---+----+------+

所以我想要做的是,如果其中一列有任何空值,则抛出一个异常。Test1函数中的内容如下所示:

if(col("id").isNull.sum > 0){
  throw IllegalArgumentException("id can not have any nulls")
}

但很明显,专栏不能相加。我也尝试了sum(ol(“id”).isNull),但同样无效。我在stackoverflow上看到的所有示例都与使用df级函数有关,例如df.filter(“id is null”).count>0。但在我使用的框架中,这将需要相当大的重构,以便执行简单的QC检查以抛出更准确的异常。我正在修改的函数的作用域没有访问 Dataframe 的权限。我想做的事有可能实现吗?
谢谢!

mwg9r5ms

mwg9r5ms1#

您可以为这种情况定义UDF(用户定义函数)。看一下这个例子:

import org.apache.spark.sql.{functions => F}

val testUdf =  F.udf((id: Option[Int], name: Option[String]) => {
  (id, name) match {
    case (None, _) => throw new RuntimeException("id can not have any nulls")
    case (_, None) => throw new RuntimeException("name can not have any nulls")
    case (Some(id), Some(name)) => s"$id$name"
  }
})
df.withColumn("concat", testUdf($"id", $"name")).show
wyyhbhjk

wyyhbhjk2#

根据你使用的Spark版本,你有几个选择。
如果Spark版本低于3.1,您可以这样使用UDF:

val throwExUdf = udf(
    (d: Option[String]) => {
      d match {
        case None => throw new RuntimeException("message")
        case Some(v) => v
      }
    }
  )

df.withColumn(
        "concat",
        when($"name".isNotNull, concat($"id", $"name")).otherwise(throwExUdf($"name"))
)

对于Spark版本>=3.1,您还可以选择使用内置函数raise_error(文档)
如下所示:

df.withColumn(
        "concat",
        when($"name".isNotNull, concat($"id", $"name")).otherwise(raise_error(lit("Name is null")))
)

相关问题