我正在用java编写一个spark应用程序。我遇到了一个问题,在按特定列对行进行分组之后,必须从不同的行中连接字符串。感谢您的帮助!!谢谢。输入数据集预期输出数据集
6vl6ewon1#
使用 collect_list 当你分组然后使用 concat_ws 函数从列表中生成字符串。
collect_list
concat_ws
df.show(false) +--------------------------------------+------+---------------+---------------+----------------+-------+ |Errors |userid|associationtype|associationrank|associationvalue|sparkId| +--------------------------------------+------+---------------+---------------+----------------+-------+ |Primary Key Constraint Violated |3 |Brand5 |error |Lee |4 | |Incorrect datatype in associationrank|3 |Brand5 |error |Lee |4 | +--------------------------------------+------+---------------+---------------+----------------+-------+ df.groupBy("userid", "associationtype", "associationrank", "associationvalue", "sparkId") .agg(collect_list("Errors").as("Errors")) .withColumn("Errors", concat_ws(", ", col("Errors"))) .show(false) +------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+ |userid|associationtype|associationrank|associationvalue|sparkId|Errors | +------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+ |3 |Brand5 |error |Lee |4 |Primary Key Constraint Violated, Incorrect datatype in associationrank| +------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+
hvvq6cgz2#
检查以下代码。
scala> sdf .groupBy("sparkid") .agg(collect_set($"errors").as("error_list"),first(struct($"*")).as("data")) .select($"data.*",concat_ws(",",$"error_list").as("errors_new")) .show(false) +-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+ |errors |userid|associationtype|associationrank|associationvalue|sparkid|errors_new | +-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+ |Incorrect datatype in associationrank|8 |brand3 |dd |LeeNew |7 |Incorrect datatype in associationrank | |Incorrect datatype in associationrank|4 |brand4 |null |Lee |3 |Incorrect datatype in associationrank | |Incorrect datatype in associationrank|1 |brand1 |iuy |Lee |0 |Incorrect datatype in associationrank | |Primary Key Constraint Violated |2 |brand1 |something |Lee |5 |Primary Key Constraint Violated,Incorrect datatype in associationrank| |Primary Key Constraint Violated |2 |brand2 |22 |Lee |1 |Primary Key Constraint Violated | |Primary Key Constraint Violated |3 |brand5 |error |Lee |4 |Primary Key Constraint Violated,Incorrect datatype in associationrank| |Primary Key Constraint Violated |3 |brand3 |40 |LeeNew |2 |Primary Key Constraint Violated | +-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+
2条答案
按热度按时间6vl6ewon1#
使用
collect_list
当你分组然后使用concat_ws
函数从列表中生成字符串。hvvq6cgz2#
检查以下代码。