scala—如何在聚合函数中应用自定义逻辑

xmakbtuz  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(348)

我正在学习spark,假设我们有以下Dataframe
用户ID活动1 liked2 comment1 liked1 liked1 comment2 liked
每种类型的活动都有自己的权重,用于计算得分
活动权重1社区3
这就是期望的输出
用户idscore1624
分数的计算包括计算一个事件发生的次数以及它们的权重。例如,用户1执行3个likes和一条评论,因此权重由

(3 * 1) + (1 * 3)

如何在spark中进行此计算?
我的初步尝试如下

val df1 = evidenceDF
      .groupBy("user_id")
      .agg(collect_set("event") as "event_ids")

但我被Map部分卡住了。我想要实现的是在我将事件聚合到 event_ids 字段中,我将拆分它们并在一个Map函数中进行计算,但我很难进一步移动。
我搜索了关于使用自定义聚合器函数的信息,但听起来很复杂,有没有一种直接的方法可以做到这一点?

vfh0ocws

vfh0ocws1#

您可以使用权重数据框联接分组依据和权重总和:

val df1 = evidenceDF.join(df_weight, Seq("activity"))
  .groupBy("user_id")
  .agg(
    sum(col("weight")).as("score")
  )

df1.show

//+-------+-----+
//|user_id|score|
//+-------+-----+
//|      1|    6|
//|      2|    4|
//+-------+-----+

或者如果你实际上只有两个类别,那么使用 when 直接在 sum :

val df1 = evidenceDF.groupBy("user_id")
  .agg(
    sum(
      when(col("activity") === "liked", 1)
        .when(col("activity") === "comment", 3)
    ).as("score")
  )

相关问题