我有一个带有 "groupName"
列。数据记录沿着不同的分区分布。我想把记录按 "groupName"
,逐批收集并对整个批应用函数。我所说的“批处理”是指预定义数量的记录(我们称之为 maxBatchCount
)属于同一组。我所说的“一批一批”是指我希望有效地使用内存,而不是将所有分区收集到内存中。
更具体地说,batch函数包括对整个批的序列化、压缩和加密。稍后,它将转换为另一个数据集,并使用 partitionBy("groupName")
. 因此,我无法避免一个完整的洗牌。
有没有一个简单的方法?我做了下面描述的一些尝试,但tl/dr似乎有点过于复杂,最终在java内存问题上失败了。
细节
我试着用 repartition("groupName")
, mapPartitions
以及 Iterator
的 grouped(maxBatchCount)
似乎非常适合这项任务的方法。但是,重新分区只能确保相同的记录 groupName
将位于同一分区中,但单个分区可能具有来自多个不同分区的记录 groupName
(如果#组>分区),它们可以分散在分区内。所以现在我仍然需要先在每个分区内进行分组。问题是从Map分区我得到一个 Iterator
它似乎没有这样的api,我不想收集所有的数据到内存中。
然后我尝试用 Iterator
的 partition
方法。其思想是首先迭代整个分区以构建 Set
然后使用 Iterator.partition
为每个当前组构建一个单独的迭代器。然后使用 grouped
和以前一样。
它是这样的-为了举例说明,我使用了一个两个int的简单case类 groupName
实际上是 mod3
列,通过应用 modulo 3
每个的函数 number
范围:
case class Mod3(number: Int, mod3: Int)
val maxBatchCount = 5
val df = spark.sparkContext.parallelize(Range(1,21))
.toDF("number").withColumn("mod3", col("number") % 3)
// here I choose #partitions < #groups for illustration
val dff = df.repartition(1, col("mod3"))
val dsArr = dff.as[Mod3].mapPartitions(partitionIt => {
// we'll need 2 iterations
val (it1, it2) = partitionIt.duplicate
// first iterate to create a Set of all present groups
val mod3set = it1.map(_.mod3).toSet
// build partitioned iterators map (one for each group present)
var it: Iterator[Mod3] = it2 // init var
val itMap = mod3set.map(mod3val => {
val (filteredIt, residueIt) = it.partition(_.mod3 == mod3val)
val pair = (mod3val -> filteredIt)
it = residueIt
pair
}).toMap
mod3set.flatMap(mod3val => {
itMap(mod3val).grouped(maxBatchCount).map(grp => {
val batch = grp.toList
batch.map(_.number).toArray[Int] // imagine some other batch function
})
}).toIterator
}).as[Array[Int]]
val dsArrCollect = dsArr.collect
dsArrCollect.map(_.toList).foreach(println)
在使用小数据进行测试时,这似乎工作得很好,但是在使用实际数据运行时(在一个实际的spark集群上,有20个执行器,每个执行器有2个内核),我收到了 java.lang.OutOfMemoryError: GC overhead limit exceeded
注意:在我的实际数据组中,大小是高度倾斜的,其中一个组的大小大约是所有其他组的大小(我猜gc内存问题与该组有关)。正因为如此,我还试图结合一个次要的中性列在 repartition
但没用。
谢谢你的指点,谢谢!
1条答案
按热度按时间ryevplcw1#
我认为重新分区+Map分区的方法是正确的。问题是map partition函数最终会将整个分区加载到内存中。
第一种解决方案是增加分区的数量,从而减少分区中的组/数据的数量。
另一种解决方案是使用partitionit.flatmap,一次处理1条记录,最多只能累积1组数据
使用sortwithinpartitions,以便来自同一组的记录是连续的
在flatmap函数中,积累数据并跟踪组更改。