scala Spark按百分比分配数据

kmbjn2e3  于 2022-11-09  发布在  Scala
关注(0)|答案(1)|浏览(827)

假设我有多个客户服务级别(高级、基本和免费),每个级别都有几个专门的支持团队:

  • 高级=>紫色、蓝色、绿色和黄色
  • BASIC=>红白黑
  • 免费=>橙色和粉色

每个客户都有一个特定的客户服务层。
我正在努力实现的是为我的每个客户指定一个特定的支持团队,让他们知道他们的客户服务级别。此外,我希望特定客户服务级别的团队要处理的客户数量大致相同。
当客户少于分配到特定客户支持层的团队时,我并不真正关心分配给哪个团队。我只是希望每个团队处理的客户数量大致相同。
在此示例中,我的基本数据集如下所示:

一些可能的输出:

我真的想不出一个办法来对付斯帕克,有谁能帮我起来吗?

l3zydbqr

l3zydbqr1#

好的,让我们一步一步地解决这个问题。如果我应该这样做,我会首先创建一个Map,它Map每个组及其可能的值的数量,以避免对每一行进行重新计算,如下所示:

// groupsDF is the single column df in your question
val groupsAvailableCount: Map[Int, Long] = 
  groupsDF
    .groupBy("group")
    .count
    .as[(Int, Long)]
    .collect.toMap
// result would be: Map(1 -> 2, 2 -> 3, 3 -> 4)

现在第二部分有点棘手,因为正如您所解释的,您的组中每个值的概率是相同的(就像在组1中一样,所有值的概率都是0.25),而它们在实际问题中的概率可能不相同。无论如何,这是一个有概率的排列,你可以很容易地决定如何对你的问题进行排序。这第二部分的好处是,它将所有这些概率问题的排列抽象为一个函数,您可以根据需要对其进行更改,而您的其余代码将不受更改的影响:

def getWithProbabilities(groupId: Int, probs: Map[String, Double]): List[String] = {
  def getValues(groupId: Int, probabilities: List[(String, Double)]): List[String] = {
    // here is the logic, you can change the sorting and other stuff as you would want to
    val take = groupsAvailableCount.getOrElse(groupId, 0L).toInt
    if (take > probabilities.length) getValues(groupId, probabilities ::: probabilities)
    else probabilities.sortBy(_._2).take(take).map(_._1)
  }
  getValues(groupId, probs.toList)
}

因此,现在,您以某种方式神奇地(函数抽象的事情)能够从基于规范的每个组中获得您想要的值!差不多完成了,现在您只需要不同的groupID,并且给定组,您可以获取它们的值,并创建您的行:

groupsDF.distinct.as[Int]. // distinct group ids
  .collect.toList          // collect as scala list, to do the mappings
  .map(groupId => groupId -> getWithProbabilities(groupId, spec(groupId))) // here you calculate the values for the group, based on spec map
  .flatMap {
    case (groupId, values) => values.map(groupId -> _)
  } // inline the results to create unique rows
  .toDF("group", "value") // create your dataframe out of it

其结果将是:

+-----+-------------+
|group|        value|
+-----+-------------+
|    1|  value_one_x|
|    1|  value_two_x|
|    2|  value_one_y|
|    2|  value_two_y|
|    2|value_three_y|
|    3|  value_one_z|
|    3|  value_two_z|
|    3|  value_one_z|
|    3|  value_two_z|
+-----+-------------+

更新:

因此,要使用Spark API而不使用collect,可以使用UDF,它基本上调用了我们之前的函数:

val getWithProbabilitiesUDF = udf { (groupId: Int) => 
  getWithProbabilities(groupId, spec(groupId))
}

最后,只需在您拥有的组 Dataframe 上调用它:

groupsDF
  .distinct
  .select(
    col("group"),
    explode(getWithProbabilitiesUDF(col("group"))) as "value"
  )

相关问题