javaspark

qlzsbp2j  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(430)

我有下面的Dataframe。

Column_1 Column_2
1        A
1        X
2        X
3        B
3        X
4        C
4        D

在上面的数据框中,第1列中可以有多条相同值的记录。我只需要删除那些条目数大于且在第2列中有x的记录。如果列2有两个不同的值,比如c和d,我必须保留它们。只有当一个记录有多个条目但其中一个条目有x时,我才必须将它们从Dataframe中删除。请注意,如果在第2列中只有一条记录带有x,则不应删除该记录。
预期产量:

Column_1 Column_2
1        A
2        X
3        B
4        C
4        D

请让我知道这是否可以在java spark中实现。我能够完全删除x记录,但不知道如何实现上述目标。
谢谢您。

oknwwptz

oknwwptz1#

完整的解释内联工作代码,输入csv看起来像

Column_1,Column_2
1,A
1,X
2,X
3,B
3,X
4,C
4,D
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.*;

public class DropDups {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
        Dataset<Row> ds = spark.read()
                .option("header", "true")
                .csv("src/main/resources/duplicateRec.csv");

        ds.show();

/* Ouputs
+--------+--------+
|Column_1|Column_2|
+--------+--------+
|       1|       A|
|       1|       X|
|       2|       X|
|       3|       B|
|       3|       X|
|       4|       C|
|       4|       D|
+--------+--------+
 */

        //Group by Column_1 and collect set of elements from Column_2 and remove 'X' from the set
        ds = ds.groupBy(ds.col("Column_1")).agg(
                array_remove(collect_set(ds.col("Column_2")), lit("X")).as("Column_2_list"));

        // if the set is empty then ["X"] else the actual set
        ds = ds.withColumn("Column_2_array",
                when(size(ds.col("Column_2_list")).equalTo(0), lit("X".split(",")))
                        .otherwise(ds.col("Column_2_list")));

        //Replace the column and drop the extra columns
        ds.withColumn("Column_2", explode(ds.col("Column_2_array")))
                .drop("Column_2_list", "Column_2_array")
                .show();
/* Ouputs
+--------+--------+
|Column_1|Column_2|
+--------+--------+
|       3|       B|
|       1|       A|
|       4|       C|
|       4|       D|
|       2|       X|
+--------+--------+
         */
    }
}
relj7zay

relj7zay2#

它是scala,但java看起来几乎相同:

df.withColumn("id",row_number().over(Window.orderBy("c_1").partitionBy("c_1")))
  .where(!('c_2==="X" and 'id > 1))

+----+----+
| c_1| c_2|
+----+----+
|   1|   A|
|   3|   B|
|   4|   C| 
|   4|   D|
|   2|   X|
+----+----+

相关问题