我有一个构建多个发布者并使用MergeMany在一个发布者中返回所有发布者的函数。问题是一些用户可能在这个发布者中有很多端点,并且一次击中所有这些端点经常导致服务器超时。有没有一种方法可以限制合并中的并发网络请求(如DispatchSemaphore)?
let mergedPubs = Publishers.MergeMany(urlRequests.map { dataTaskPublisher(for: $0)
.decode(type: RawJSON.self, decoder: JSONDecoder())
.mapError { _ in
return URLError(URLError.Code.badServerResponse)
}
})
.collect()
.eraseToAnyPublisher()
2条答案
按热度按时间ryevplcw1#
合并没有现成的解决方案,但我们可以在现有的
Publishers.MergeMany
和Publishers.Concatenate
上构建它。其理念是:
Int
数组[1, 2, 3, 4, 5, 6]
和maxConcurrent = 3
,我们将得到[[1, 2, 3], [4, 5, 6]]
,因此请求1,2,3将并行执行,但4,5,6将仅在前一个块完成时开始。Publishers.MergeMany
。为了实现这一点,我们需要利用
Publishers.Concatenate
来实现Publishers.ConcatenateMany
,它只需要2个输入流。如果你想遵循合并风格,这应该在一个全新的结构中实现,但我现在在静态函数中实现了这一点。用于分块数组的实用程序:
现在我们可以实现一个新版本的
MergeMany
,它也接受一个maxConcurrent
参数。最后,你的代码看起来像这样:
这只是一个想法,可能还有其他方法可以达到同样的效果!
polhcujo2#
虽然@Fabio Felici有一个很酷的并行化工作解决方案,但在这个特定的情况下,似乎你有一个错误没有在这里显示,而解决方案不在你的代码的这一部分。
因此,在这种情况下,每次调用方法
dataTaskPublisher(for: $0)
时,看起来就像是在创建一个新的会话。这使得您只受系统资源的限制,可以发出多少请求。这里的解决方案是保留一个对'URLSession'的引用,并从该单个示例创建数据任务发布者。创建会话时,你应该向会话的初始化器传递一个配置,在那个配置中,将httpMaximumConnectionsPerHost
设置为你想要建立的最大并发连接数。你可以这样做:如果你想更聪明一点,你可以根据你选择的算法动态地改变并发操作的数量。
你可以这样使用它: