spark udaf/aggregator按顺序处理记录组

t98cgbkg  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(267)

我想用spark做一些定制的groupby聚合,它需要按顺序(时间戳)处理记录,第n条记录的处理需要处理前(n-1)条记录的输出(听起来有点像流式处理任务?)。输入在按日期划分的一大组文件中。
我目前的解决方案是实现一个自定义 org.apache.spark.sql.expressions.Aggregator ,它以增量方式将所有输入记录插入缓冲区,并在最后进行所有聚合。伪代码如下:

class MyAgg extends Aggregator[IN, SortedList[IN], OUT] {
    override def zero: SortedList[IN] = SortedList.empty

    override def reduce(b: SortedList[IN], e: Event): SortedList[IN] =
        insert_into_b(e)

    override def merge(b1: SortedList[IN], b2: SortedList[IN]): SortedList[IN] =
        merge_two_lists(b1, b2)

    override def finish(b: SortedList[IN]): OUT =
        my_main_aggregation_happens_here:
            b.foldLeft ...
}

val result = myInputDS.groupBy(_.key).agg((new MyAgg()).toColumn)

这个解决方案可行,但我非常关心性能,因为reduce阶段根本不会减少任何内容,所有记录都需要存储在内存中直到最后。我希望有更好的解决办法。
你能帮忙吗?谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题