如何最小化akka流中的内存使用

iaqfqrcu  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(158)

我有一个流,它在某个时候会将对象分组以创建文件。我想我可以通过在流中早期序列化对象来压缩一些字节。但我最大的问题是如何优化像这样的流的内存占用:

val sourceOfCustomer = Source.repeat(Customer(name = "test"))
def serializeCustomer(customer: Customer) = customer.toString

sourceOfCustomers
.via(serializeCustomer) // 1KB
.grouped(1000000) // 1GB
.via(processFile) // 1GB
.via(moreProcessing) // 1GB
.via(evenMoreProcessing) // 1GB
.to(fileSink) // 1GB

这会使我在稳定状态下使用的内存至少为5GB。这是否正确?
我可以使用什么策略来将其限制为1或2GB?原则上,通过折叠操作符应该是可能的。
注意:我知道一个解决方案是使小组更小,但让我们考虑小组的大小问题的约束。

qeeaahzv

qeeaahzv1#

对不起,可能我遗漏了一些东西,但我没有在最新的Akka Stream文档中找到group操作,我猜你是指grouped操作:https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/grouped.html
如果是这样的话,那么就意味着在.grouped(1000000) // 1GB中,您在流中创建了一组元素,这些元素可以同时处理,因此一个或多个1GB大小的组可以在一个时刻出现在内存中。因此,为了将流中的内存占用限制在1GB以内,您可以使用以下两种方法之一:
1)减少同时处理的大型组的数量。这可以通过throttle操作来实现:请https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/throttle.html#throttle参见示例代码片段

import scala.concurrent.duration._
...

.group(1000000) // 1GB
.throttle(1, 1 minute)

2)减少大型组的大小

val parallelismLevel = Runtime.getRuntime.availableProcessors() // or another custom level which represents stream processing parallelism
val baseGroupSize = 1000000 // 1GB
val groupSize =  baseGroupSize / parallelismLevel 

sourceOfCustomers
.via(serializeCustomer) // 1KB
.group(groupSize)

希望这对你有帮助!

相关问题