Swift合并合并多个发布商

mkh04yzy  于 2023-03-28  发布在  Swift
关注(0)|答案(2)|浏览(183)

我有一个构建多个发布者并使用MergeMany在一个发布者中返回所有发布者的函数。问题是一些用户可能在这个发布者中有很多端点,并且一次击中所有这些端点经常导致服务器超时。有没有一种方法可以限制合并中的并发网络请求(如DispatchSemaphore)?

let mergedPubs = Publishers.MergeMany(urlRequests.map { dataTaskPublisher(for: $0)
            .decode(type: RawJSON.self, decoder: JSONDecoder())
            .mapError { _ in
                return URLError(URLError.Code.badServerResponse)
            }
    })
        .collect()
        .eraseToAnyPublisher()
ryevplcw

ryevplcw1#

合并没有现成的解决方案,但我们可以在现有的Publishers.MergeManyPublishers.Concatenate上构建它。
其理念是:

  • 将输入数组划分为最大并发请求的块。例如,使用简单的Int数组[1, 2, 3, 4, 5, 6]maxConcurrent = 3,我们将得到[[1, 2, 3], [4, 5, 6]],因此请求1,2,3将并行执行,但4,5,6将仅在前一个块完成时开始。
  • 在这些子数组上使用Publishers.MergeMany
  • 将上一步骤中产生的每个新发布者连接起来。

为了实现这一点,我们需要利用Publishers.Concatenate来实现Publishers.ConcatenateMany,它只需要2个输入流。如果你想遵循合并风格,这应该在一个全新的结构中实现,但我现在在静态函数中实现了这一点。

extension Publishers {
  static func concatenateMany<Output, Failure>(_ publishers: [AnyPublisher<Output, Failure>]) -> AnyPublisher<Output, Failure> {
    return publishers.reduce(Empty().eraseToAnyPublisher()) { acc, elem in
      Publishers.Concatenate(prefix: acc, suffix: elem).eraseToAnyPublisher()
    }
  }

用于分块数组的实用程序:

extension Array {
  func chunked(into size: Int) -> [[Element]] {
    return stride(from: 0, to: count, by: size).map {
      Array(self[$0 ..< Swift.min($0 + size, count)])
    }
  }
}

现在我们可以实现一个新版本的MergeMany,它也接受一个maxConcurrent参数。

extension Publishers {
  static func mergeMany<Output, Failure>(maxConcurrent: Int, _ publishers: [AnyPublisher<Output, Failure>]) -> AnyPublisher<Output, Failure> {
    return Publishers.concatenateMany(
      publishers.chunked(into: maxConcurrent)
        .map {
          Publishers.MergeMany($0)
            .eraseToAnyPublisher()
        }
    )
  }
}

最后,你的代码看起来像这样:

let mergedPubs = Publishers.mergeMany(maxConcurrent: 3, requests)
  .collect()
  .eraseToAnyPublisher()

这只是一个想法,可能还有其他方法可以达到同样的效果!

polhcujo

polhcujo2#

虽然@Fabio Felici有一个很酷的并行化工作解决方案,但在这个特定的情况下,似乎你有一个错误没有在这里显示,而解决方案不在你的代码的这一部分。
因此,在这种情况下,每次调用方法dataTaskPublisher(for: $0)时,看起来就像是在创建一个新的会话。这使得您只受系统资源的限制,可以发出多少请求。这里的解决方案是保留一个对'URLSession'的引用,并从该单个示例创建数据任务发布者。创建会话时,你应该向会话的初始化器传递一个配置,在那个配置中,将httpMaximumConnectionsPerHost设置为你想要建立的最大并发连接数。你可以这样做:

struct RawJSON: Decodable {
    /// properties
}

class YourClass {
    
    let session = {
        /// Create a `URLSessionConfiguration`
        let configuration = URLSessionConfiguration()
        /// Set the maximum concurrent connections you want
        configuration.httpMaximumConnectionsPerHost = 10
        return URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
    }()

    /// I am assuming this is what your `dataTaskPublisher` method looks like
    func dataTaskPublisher(for url: URL) -> URLSession.DataTaskPublisher {
        return session.dataTaskPublisher(for: url)
    }

    /// This is your method that you posted here
    func yourMethod(urlRequests: [URL]) {
        let mergedPubs = Publishers.MergeMany(urlRequests.map { dataTaskPublisher(for: $0)
                    .decode(type: RawJSON.self, decoder: JSONDecoder())
                    .mapError { _ in
                        return URLError(URLError.Code.badServerResponse)
                    }
            })
                .collect()
                .eraseToAnyPublisher()
    }
}

如果你想更聪明一点,你可以根据你选择的算法动态地改变并发操作的数量。

struct RawJSON: Decodable {
    /// properties
}

class YourClass {
    
    let operationQueue: OperationQueue
    let session: URLSession
    init() {
        /// Create a `URLSessionConfiguration`
        let configuration = URLSessionConfiguration()
        /// Create `OperationQueue`
        operationQueue = OperationQueue()
        /// Set the maximum concurrent connections you want
        configuration.httpMaximumConnectionsPerHost = 10
        session = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
    }

    /// I am assuming this is what your `dataTaskPublisher` method looks like
    func dataTaskPublisher(for url: URL) -> URLSession.DataTaskPublisher {
        return session.dataTaskPublisher(for: url)
    }

    /// This is your method that you posted here.
    func yourMethod(urlRequests: [URL]) {
        let mergedPubs = Publishers.MergeMany(urlRequests.map { dataTaskPublisher(for: $0)
                    .decode(type: RawJSON.self, decoder: JSONDecoder())
                    .mapError { _ in
                        return URLError(URLError.Code.badServerResponse)
                    }
            })
                .collect()
                .eraseToAnyPublisher()
    }
    
    /// Call this to change the number of concurrent operations in the queue
    func changeCuncurrentOperationCount(maxOperations: Int) {
        operationQueue.maxConcurrentOperationCount = maxOperations
    }
}

你可以这样使用它:

相关问题