Spark java group by和filter on custom condition

u4dcyp6a  于 2023-03-23  发布在  Apache
关注(0)|答案(2)|浏览(136)

我有一个类似于下面的问题陈述,我想在java中的spark job中编写它。
输入就像下面的地方有一个人的名字和他们的分数
| 姓名|得分|
| - ------|- ------|
| A类|十七|
| A类|十九|
| A类|二十二|
| B|十七|
| B|十五|
我想要的输出,如果一个人得分超过20,那么这个人的所有记录都应该在输出中。因为只有人A得分,所以这个人的记录在输出中。
| 姓名|得分|
| - ------|- ------|
| A类|十七|
| A类|十九|
| A类|二十二|
到目前为止,我已经尝试了这段代码,但我正在寻找一种更简单的方法来实现它。

sparkSession.read()
    .csv("scores.csv")
    .toDF("Name", "Score")
    .groupBy("Name")
    .agg(collect_list("Score").as("Score"))
    .filter((FilterFunction<Row>) row -> {
        WrappedArray<String> scores = (WrappedArray<String>) row.get(1);
        return scores.exists(v1 -> Long.parseLong(v1) > 20);
    })
    .withColumn("Score", explode(col("Score")))
    .show(false);

我希望优化这个过滤函数。在我原来的问题中,行的结构很复杂,我想使用更简单的sql条件表达式,如any element with score > 20
有没有办法可以简化。

mec1mxoz

mec1mxoz1#

你可以用一个按名称分区的窗口来做这件事,如下所示:

df
    .withColumn("max_score", max("Score").over(Window.partitionBy("Name")))
    .where(col("max_score").geq(20))
    .show(false);
bkhjykvo

bkhjykvo2#

您可以尝试将joinfilter一起使用;首先,你得到一个name的不同列表,这些name至少有一个score超过20,如下所示:

val filteredData = dataset
  .filter(col("score").gt(20))
  .selectExpr("name as r_name").distinct()

然后,您可以使用inner join只保留您想要的数据:

val finalDS = dataset
  .join(filteredData, dataset.col("name") === filteredData.col("r_name"), "inner")
  .select("name", "score")

最终结果:

+----+-----+
|name|score|
+----+-----+
|A   |17   |
|A   |19   |
|A   |21   |
+----+-----+

祝你好运!

相关问题