spark数据集基于过滤器合并两行并删除剩余的一行

a14dhokn  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(349)

我只有4天大,正在学习spark并用java8编写spark作业&我需要根据特定条件将数据集中的两行或多行合并为一行。另外,需要删除重复的列
dataset.show()

ID | branch | name    | role | age
1  | HQ     | Harry   | DEV  | 24
1  | Berlin | Harry   | QA   | 24
1  | Hungary| Harry   | BA   | 24
2  | HQ     | Chris   | Prod | 39
2  | Chime  | Chris   | Prod | 39
2  | Cornell| Chris   | Acc  | 39
2  | Chappel| Chris   |      | 39

所需输出:

ID | branch | name    | role        | age 
1  | HQ     | Harry   | "DEV,QA,BA"| 24
2  | HQ     | Chris   | "Prod,Acc" | 39

条件是,使用以下逻辑合并行:
合并具有相同id的行,并将所有行的“角色”列(无重复项或空值)连接/组合到branch=“hq”所在的行中。
保留列分支为“hq”的行,并删除其他行。
我写了这样一段话:

Dataset<Row> mergeRoles(SparkSession sparkSession, Dataset<Row> dataset) {
        WindowSpec overCategory = Window.partitionBy("ID");
        dataset = dataset.withColumn("temp_role", collect_list("role").over(overCategory));
        dataset.createOrReplaceTempView("tmp");
        dataset = sparkSession.sql("SELECT *, mergeRole(temp_role) as role FROM tmp")
                              .dropDuplicates("ID");
        return dataset;
    }

写了一个自定义项 mergeRole 合并“角色”列中的值。但不幸的是,我不能使用 createOrReplaceTempView 由于一些内存问题。请帮忙,需要找个更好的方法。

pbwdgjma

pbwdgjma1#

试试这个-

加载测试数据

String data = "ID | branch | name    | role | age\n" +
                "1  | HQ     | Harry   | DEV  | 24\n" +
                "1  | Berlin | Harry   | QA   | 24\n" +
                "1  | Hungary| Harry   | BA   | 24\n" +
                "2  | HQ     | Chris   | Prod | 39\n" +
                "2  | Chime  | Chris   | Prod | 39\n" +
                "2  | Cornell| Chris   | Acc  | 39\n" +
                "2  | Chappel| Chris   |      | 39";

        List<String> list1 = Arrays.stream(data.split(System.lineSeparator()))
                .map(s -> Arrays.stream(s.split("\\|"))
                        .map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", ""))
                        .collect(Collectors.joining(","))
                )
                .collect(Collectors.toList());

        Dataset<Row> dataset = spark.read()
                .option("header", true)
                .option("inferSchema", true)
                .option("sep", ",")
                .csv(spark.createDataset(list1, Encoders.STRING()));
        dataset.show(false);
        dataset.printSchema();

        /**
         * +---+-------+-----+----+---+
         * |ID |branch |name |role|age|
         * +---+-------+-----+----+---+
         * |1  |HQ     |Harry|DEV |24 |
         * |1  |Berlin |Harry|QA  |24 |
         * |1  |Hungary|Harry|BA  |24 |
         * |2  |HQ     |Chris|Prod|39 |
         * |2  |Chime  |Chris|Prod|39 |
         * |2  |Cornell|Chris|Acc |39 |
         * |2  |Chappel|Chris|null|39 |
         * +---+-------+-----+----+---+
         *
         * root
         *  |-- ID: integer (nullable = true)
         *  |-- branch: string (nullable = true)
         *  |-- name: string (nullable = true)
         *  |-- role: string (nullable = true)
         *  |-- age: integer (nullable = true)
         */

获取总部分支的角色集

dataset.withColumn("role", collect_set(
                when(trim(col("role")).equalTo(lit("")), lit(null)).otherwise(col("role"))
        ).over(Window.partitionBy("ID")))
                .where("branch='HQ'")
                .show(false);
        /**
         * +---+------+-----+-------------+---+
         * |ID |branch|name |role         |age|
         * +---+------+-----+-------------+---+
         * |2  |HQ    |Chris|[Prod, Acc]  |39 |
         * |1  |HQ    |Harry|[QA, BA, DEV]|24 |
         * +---+------+-----+-------------+---+
         */

相关问题