我使用的是spark-sql-2.3.1v,具有以下场景:
给定数据集:
val ds = Seq(
(1, "x1", "y1", "0.1992019"),
(2, null, "y2", "2.2500000"),
(3, "x3", null, "15.34567"),
(4, null, "y4", null),
(5, "x4", "y4", "0")
).toDF("id","col_x", "col_y","value")
即
+---+-----+-----+---------+
| id|col_x|col_y| value|
+---+-----+-----+---------+
| 1| x1| y1|0.1992019|
| 2| null| y2|2.2500000|
| 3| x3| null| 15.34567|
| 4| null| y4| null|
| 5| x4| y4| 0|
+---+-----+-----+---------+
要求:
我得到操作栏(即。, operationCol
)我需要从外部进行一些计算。
在对列“col\ux”执行某些操作时,我需要通过过滤掉所有具有“col\ux”空值的记录来创建一个新的数据集,并返回该新数据集。
同样,在对列“col\y”执行某些操作时,我需要通过过滤掉所有具有“col\y”空值的记录并返回新的数据集来创建一个新的数据集。
例子:
val operationCol ="col_x";
if(operationCol === "col_x"){
//filter out all rows which has "col_x" null and return that new dataset.
}
if(operationCol === "col_y"){
//filter out all rows which has "col_y" null and return that new dataset.
}
当operationcol==“col\u x”预期输出时:
+---+-----+-----+---------+
| id|col_x|col_y| value|
+---+-----+-----+---------+
| 1| x1| y1|0.1992019|
| 3| x3| null| 15.34567|
| 5| x4| y4| 0|
+---+-----+-----+---------+
当operationcol==“col\u y”预期输出时:
+---+-----+-----+---------+
| id|col_x|col_y| value|
+---+-----+-----+---------+
| 1| x1| y1|0.1992019|
| 2| null| y2|2.2500000|
| 4| null| y4| null|
| 5| x4| y4| 0|
+---+-----+-----+---------+
如何实现这一预期产出?换句话说,如何进行Dataframe的分支?如何在流的中间创建一个新的dataframe/dataset?
2条答案
按热度按时间e4yzc0pl1#
你也可以使用
filter
筛选空值。wztqucjr2#
你可以用
df.na.drop()
删除包含空值的行。drop函数可以将要考虑的列的列表作为输入,因此在本例中,可以按如下方式编写:这将创建一个新的Dataframe
newDf
所有行都在哪里operationCol
已删除。