使用scala的布尔运算折叠sparkDataframe中的列

hof1towb  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(427)

如何使用scala将布尔列折叠为一行?
第1部分:

A  true
A  false
B  false
B  false
C  true
B  false
A  true
C  true

期望输出

B  false
A  true
C  true

我能想到的一个解决方案是按第一列条目对它们进行分组,在单独的Dataframe中过滤真行和假行,删除重复项,最后将一个Dataframe(假)添加到另一个(真)中,同时检查字母(如a)是否已经存在于另一个(真)Dataframe中。
这个解决方案相当混乱。另外,不知道这是否适用于所有边缘情况。有什么聪明的方法吗。
我是一个绝对的初学者,任何帮助都是感激的。
编辑:给定的答案适用于上述给定场景,但不适用于此场景。有什么方法可以达到预期的效果吗?
第2部分:

A  true    "Apple"
A  false   ""
B  false   ""
B  false   ""
C  true    "Cat"
C  true    "Cotton"
C  false   ""

期望输出

B  false []
A  true  ["Apple"]
C  true  ["Cat","Cotton"]

我试图通过按col1和col2分组,然后使用collect\u set折叠col3来实现这一点,然后
按第1列分组
收集第二列作为布尔值集
检查是否有一个true如果是,那么or表达式将始终计算为true。
但这会导致col3\U集合全部丢失。

iyfjxgzm

iyfjxgzm1#

按第1列分组
收集第二列作为布尔值集
收集第3列作为字符串集
检查是否有 true 如果是,那么您的or表达式将始终计算为true。
删除空字符串 ""col3_set ```
import org.apache.spark.sql.functions._

object GroupByAgg {

def main(args: Array[String]): Unit = {

val spark = Constant.getSparkSess

import spark.implicits._

val df = List(("A", true,"Apple"), ("A", false,""),
  ("B", false,""),
  ("B", false,""),
  ("C", true,"Cat"),
  ("C", true,"Cotton"),
  ("C", true,"")).toDF("Col1", "Col2","Col3")

//Group by 1st column
df.groupBy("Col1")
  // Collect unique values
  .agg(collect_set("Col2").as("Col2_set"),collect_set("Col3").as("Col3_set"))
  //check if the array contains single true
  .withColumn("OutputCol2", when(array_contains(col("Col2_set"), true), true)
    .otherwise(false))
.withColumn("OutputCol3",array_remove(col("Col3_set"),lit("")))

//.withColumn("OutputCol3",expr("filter(Col3_set, x -> x != '')"))
.drop("Col2_set")
.drop("Col3_set")
.show()
}

}

输出:

+----+----------+-------------+
|Col1|OutputCol2| OutputCol3|
+----+----------+-------------+
| B| false| []|
| C| true|[Cat, Cotton]|
| A| true| [Apple]|
+----+----------+-------------+

06odsfpq

06odsfpq2#

试试这个:
col1分组 collect_set 在col2上——对于boolean,collect\u set最多只能提供2个元素,这对性能也有好处
将第2步中收集的布尔值集传递给一个udf,它将执行一个简单的 recudeLeft 为了做 OR 所有元素的操作。

scala> val df = List(
     | ("A",  true),
     | ("A",  false),
     | ("B",  false),
     | ("B",  false),
     | ("C",  true),
     | ("B",  false),
     | ("A",  true),
     | ("C",  true)
     | ).toDF("col1","col2")
df: org.apache.spark.sql.DataFrame = [col1: string, col2: boolean]

scala> df.show
+----+-----+
|col1| col2|
+----+-----+
|   A| true|
|   A|false|
|   B|false|
|   B|false|
|   C| true|
|   B|false|
|   A| true|
|   C| true|
+----+-----+

scala> val aggOr = udf((a:Seq[Boolean])=>{a.reduceLeft(_||_)})
aggOr: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(ArrayType(BooleanType,false))))

scala> df.groupBy("col1").agg(aggOr(collect_set("col2")).as("col2Or")).show
+----+------+
|col1|col2Or|
+----+------+
|   B| false|
|   C|  true|
|   A|  true|
+----+------+

相关问题