基于给定的操作列创建一个新的数据集

rdlzhqv9  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(414)

我使用的是spark-sql-2.3.1v,具有以下场景:
给定数据集:

val ds = Seq(
  (1, "x1", "y1", "0.1992019"),
  (2, null, "y2", "2.2500000"),
  (3, "x3", null, "15.34567"),
  (4, null, "y4", null),
  (5, "x4", "y4", "0")
   ).toDF("id","col_x", "col_y","value")

+---+-----+-----+---------+
| id|col_x|col_y|    value|
+---+-----+-----+---------+
|  1|   x1|   y1|0.1992019|
|  2| null|   y2|2.2500000|
|  3|   x3| null| 15.34567|
|  4| null|   y4|     null|
|  5|   x4|   y4|        0|
+---+-----+-----+---------+

要求:
我得到操作栏(即。, operationCol )我需要从外部进行一些计算。
在对列“col\ux”执行某些操作时,我需要通过过滤掉所有具有“col\ux”空值的记录来创建一个新的数据集,并返回该新数据集。
同样,在对列“col\y”执行某些操作时,我需要通过过滤掉所有具有“col\y”空值的记录并返回新的数据集来创建一个新的数据集。
例子:

val operationCol ="col_x";

if(operationCol === "col_x"){
  //filter out all rows which has "col_x" null and return that new dataset.
}

if(operationCol === "col_y"){
  //filter out all rows which has "col_y" null and return that new dataset.
}

当operationcol==“col\u x”预期输出时:

+---+-----+-----+---------+
| id|col_x|col_y|    value|
+---+-----+-----+---------+
|  1|   x1|   y1|0.1992019|
|  3|   x3| null| 15.34567|
|  5|   x4|   y4|        0|
+---+-----+-----+---------+

当operationcol==“col\u y”预期输出时:

+---+-----+-----+---------+
| id|col_x|col_y|    value|
+---+-----+-----+---------+
|  1|   x1|   y1|0.1992019|
|  2| null|   y2|2.2500000|
|  4| null|   y4|     null|
|  5|   x4|   y4|        0|
+---+-----+-----+---------+

如何实现这一预期产出?换句话说,如何进行Dataframe的分支?如何在流的中间创建一个新的dataframe/dataset?

e4yzc0pl

e4yzc0pl1#

你也可以使用 filter 筛选空值。

scala> val operationCol = "col_x" // for one column
operationCol: String = col_x

scala> ds.filter(col(operationCol).isNotNull).show(false)
+---+-----+-----+---------+
|id |col_x|col_y|value    |
+---+-----+-----+---------+
|1  |x1   |y1   |0.1992019|
|3  |x3   |null |15.34567 |
|5  |x4   |y4   |0        |
+---+-----+-----+---------+

scala> val operationCol = Seq("col_x","col_y") // For multiple Columns
operationCol: Seq[String] = List(col_x, col_y)

scala> ds.filter(operationCol.map(col(_).isNotNull).reduce(_ && _)).show
+---+-----+-----+---------+
| id|col_x|col_y|    value|
+---+-----+-----+---------+
|  1|   x1|   y1|0.1992019|
|  5|   x4|   y4|        0|
+---+-----+-----+---------+
wztqucjr

wztqucjr2#

你可以用 df.na.drop() 删除包含空值的行。drop函数可以将要考虑的列的列表作为输入,因此在本例中,可以按如下方式编写:

val newDf = df.na.drop(Seq(operationCol))

这将创建一个新的Dataframe newDf 所有行都在哪里 operationCol 已删除。

相关问题