Apache Spark 检查 Dataframe 中的重复值并实现ignoreNulls参数

qkf9rpyu  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(158)

我创建了一个函数来检查基于列序列的 Dataframe 中是否存在重复值。
我想实现一个“ignoreNulls”,作为布尔参数传递给函数

  • 如果为true,将忽略空值,并且不对空值进行分组和计数。因此,对于空值,“newColName”将返回false。
  • 如果为false(默认值),则将空值视为一组,如果我正在检查的键有多个空值,则返回true。

我不知道我该怎么做。我应该使用if还是case?有一些表达式可以忽略partitionBy语句中的空值?
有人能帮我吗?
这是当前函数

def checkRepeatedKey(newColName: String, keys: Seq[String])(dataframe: DataFrame): DataFrame = {
    val repeatedCondition = $"sum" > 1
    val windowCondition   = Window.partitionBy(keys.head, keysToCheck.tail: _*)

    dataframe
      .withColumn("count", lit(1))
      .withColumn("sum", sum("count").over(windowCondition))
      .withColumn(newColName, repeatedCondition)
      .drop("count", "sum")
  }

部分测试数据

val testDF = Seq(
      ("1", Some("name-1")),
      ("2", Some("repeated-name")),
      ("3", Some("repeated-name")),
      ("4", Some("name-4")),
      ("5", None),
      ("6", None)
    ).toDF("name_key", "name")

测试功能
val results = testDF.transform(checkRepeatedKey("has_repeated_name", Seq("name"))
输出(不含ignoreNulls实作)

+--------+---------------+--------------------+
|name_key|       name    |  has_repeated_name |
+--------+---------------+--------------------+
|     1  |      name-1   |              false |
+--------+---------------+--------------------+
|     2  | repeated-name |               true |
+--------+---------------+--------------------+
|     3  | repeated-name |               true |
+--------+---------------+--------------------+
|     4  |      name-4   |              false |
+--------+---------------+--------------------+
|     5  |         null  |               true |
+--------+---------------+--------------------+
|     6  |         null  |               true |
+--------+---------------+--------------------+

而使用ignoreNulls=true的实现应该是这样的

-- function header with ignoreNulls parameter
def checkRepeatedKey(newColName: String, keys: Seq[String], ignoreNulls: Boolean)(dataframe: DataFrame): DataFrame = 

-- using the function, passing true for ignoreNulls
testDF.transform(checkRepeatedKey("has_repeated_name", Seq("name"), true)

-- expected output for nulls
+--------+---------------+--------------------+
|     5  |         null  |              false |
+--------+---------------+--------------------+
|     6  |         null  |              false |
+--------+---------------+--------------------+
jgwigjjp

jgwigjjp1#

首先要定义好keys中只有部分列为空时的逻辑,是算作 * 空值 *,还是keys中所有列都为空时才定义 * 空值 *?
为了简单起见,我们假设keys中只有一列(您可以轻松地将逻辑扩展为多列),只需在checkRepeatedKey函数中添加一个简单的if即可:

def checkIfNullValue(keys: Seq[String]): Column = {
// for the sake of simplicity checking only the first key 
    col(keys.head).isNull
}

def checkRepeatedKey(newColName: String, keys: Seq[String], ignoreNulls: Boolean)(dataframe: DataFrame): DataFrame = {
    ...
    ...

    val df = dataframe
      .withColumn("count", lit(1))
      .withColumn("sum", sum("count").over(windowCondition))
      .withColumn(newColName, repeatedCondition)
      .drop("count", "sum")
    
    if (ignoreNulls) 
        df.withColumn(newColName, when(checkIfNullValue(keys), df(newColName)).otherwise(lit(false)) 
    else df
  }

相关问题