我有一个流,它在某个时候会将对象分组以创建文件。我想我可以通过在流中早期序列化对象来压缩一些字节。但我最大的问题是如何优化像这样的流的内存占用:
val sourceOfCustomer = Source.repeat(Customer(name = "test"))
def serializeCustomer(customer: Customer) = customer.toString
sourceOfCustomers
.via(serializeCustomer) // 1KB
.grouped(1000000) // 1GB
.via(processFile) // 1GB
.via(moreProcessing) // 1GB
.via(evenMoreProcessing) // 1GB
.to(fileSink) // 1GB
这会使我在稳定状态下使用的内存至少为5GB。这是否正确?
我可以使用什么策略来将其限制为1或2GB?原则上,通过折叠操作符应该是可能的。
注意:我知道一个解决方案是使小组更小,但让我们考虑小组的大小问题的约束。
1条答案
按热度按时间qeeaahzv1#
对不起,可能我遗漏了一些东西,但我没有在最新的Akka Stream文档中找到
group
操作,我猜你是指grouped
操作:https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/grouped.html。如果是这样的话,那么就意味着在
.grouped(1000000) // 1GB
中,您在流中创建了一组元素,这些元素可以同时处理,因此一个或多个1GB大小的组可以在一个时刻出现在内存中。因此,为了将流中的内存占用限制在1GB以内,您可以使用以下两种方法之一:1)减少同时处理的大型组的数量。这可以通过
throttle
操作来实现:请https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/throttle.html#throttle参见示例代码片段2)减少大型组的大小
希望这对你有帮助!