scala—一种高效的内存方法,可以按键重新划分大型数据集,并对每个组逐批分别应用一个函数

vhmi4jdf  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(229)

我有一个带有 "groupName" 列。数据记录沿着不同的分区分布。我想把记录按 "groupName" ,逐批收集并对整个批应用函数。我所说的“批处理”是指预定义数量的记录(我们称之为 maxBatchCount )属于同一组。我所说的“一批一批”是指我希望有效地使用内存,而不是将所有分区收集到内存中。
更具体地说,batch函数包括对整个批的序列化、压缩和加密。稍后,它将转换为另一个数据集,并使用 partitionBy("groupName") . 因此,我无法避免一个完整的洗牌。
有没有一个简单的方法?我做了下面描述的一些尝试,但tl/dr似乎有点过于复杂,最终在java内存问题上失败了。
细节
我试着用 repartition("groupName") , mapPartitions 以及 Iteratorgrouped(maxBatchCount) 似乎非常适合这项任务的方法。但是,重新分区只能确保相同的记录 groupName 将位于同一分区中,但单个分区可能具有来自多个不同分区的记录 groupName (如果#组>分区),它们可以分散在分区内。所以现在我仍然需要先在每个分区内进行分组。问题是从Map分区我得到一个 Iterator 它似乎没有这样的api,我不想收集所有的数据到内存中。
然后我尝试用 Iteratorpartition 方法。其思想是首先迭代整个分区以构建 Set 然后使用 Iterator.partition 为每个当前组构建一个单独的迭代器。然后使用 grouped 和以前一样。
它是这样的-为了举例说明,我使用了一个两个int的简单case类 groupName 实际上是 mod3 列,通过应用 modulo 3 每个的函数 number 范围:

case class Mod3(number: Int, mod3: Int)
  val maxBatchCount = 5
  val df = spark.sparkContext.parallelize(Range(1,21))
     .toDF("number").withColumn("mod3", col("number") % 3)

  // here I choose #partitions < #groups for illustration
  val dff = df.repartition(1, col("mod3"))

  val dsArr = dff.as[Mod3].mapPartitions(partitionIt => {
    // we'll need 2 iterations
    val (it1, it2) = partitionIt.duplicate

    // first iterate to create a Set of all present groups
    val mod3set = it1.map(_.mod3).toSet

    // build partitioned iterators map (one for each group present)
    var it: Iterator[Mod3] = it2 // init var
    val itMap = mod3set.map(mod3val => {
      val (filteredIt, residueIt) = it.partition(_.mod3 == mod3val)
      val pair = (mod3val -> filteredIt)
      it = residueIt
      pair
    }).toMap

    mod3set.flatMap(mod3val => {
      itMap(mod3val).grouped(maxBatchCount).map(grp => {
        val batch = grp.toList
        batch.map(_.number).toArray[Int] // imagine some other batch function
      })
    }).toIterator
  }).as[Array[Int]]

  val dsArrCollect = dsArr.collect
  dsArrCollect.map(_.toList).foreach(println)

在使用小数据进行测试时,这似乎工作得很好,但是在使用实际数据运行时(在一个实际的spark集群上,有20个执行器,每个执行器有2个内核),我收到了 java.lang.OutOfMemoryError: GC overhead limit exceeded 注意:在我的实际数据组中,大小是高度倾斜的,其中一个组的大小大约是所有其他组的大小(我猜gc内存问题与该组有关)。正因为如此,我还试图结合一个次要的中性列在 repartition 但没用。
谢谢你的指点,谢谢!

ryevplcw

ryevplcw1#

我认为重新分区+Map分区的方法是正确的。问题是map partition函数最终会将整个分区加载到内存中。
第一种解决方案是增加分区的数量,从而减少分区中的组/数据的数量。
另一种解决方案是使用partitionit.flatmap,一次处理1条记录,最多只能累积1组数据
使用sortwithinpartitions,以便来自同一组的记录是连续的
在flatmap函数中,积累数据并跟踪组更改。

相关问题