Swift合并替代Rx Observable.create

txu3uszq  于 2023-06-21  发布在  Swift
关注(0)|答案(5)|浏览(111)

我有一些使用RxSwift构建的代码,我正在尝试将其转换为使用Apple的合并框架。
一种非常常见的模式是将Observable.create用于单次可观测(通常是网络请求)。就像这样:

func loadWidgets() -> Observable<[Widget]> {
  return Observable.create { observer in
    // start the request when someone subscribes
    let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
      // publish result on success
      observer.onNext(widgets)
      observer.onComplete()
    }, error: { error in
      // publish error on failure
      observer.onError()
    })
    // allow cancellation
    return Disposable {
      loadTask.cancel()
    }
  }
}

我试着把它Map到合并上,但我还没能弄清楚。我所能得到的最接近的是使用Future来做这样的事情:

func loadWidgets() -> AnyPublisher<[Widget], Error> {
  return Future<[Widget], Error> { resolve in
    // start the request when someone subscribes
    let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
      // publish result on success
      resolve(.success(widgets))
    }, error: { error in
      // publish error on failure
      resolve(.failure(error))
    })
    // allow cancellation ???
  }
}

正如你所看到的,它做了大部分,但没有取消的能力。其次,future不允许有多个结果。
有没有什么方法可以像Rx Observable.create模式一样,允许取消和可选的多个结果?

enyaitl3

enyaitl31#

我想我找到了一种在Combine中使用PassthroughSubject来模拟Observable.create的方法。下面是我做的助手:

struct AnyObserver<Output, Failure: Error> {
    let onNext: ((Output) -> Void)
    let onError: ((Failure) -> Void)
    let onComplete: (() -> Void)
}

struct Disposable {
    let dispose: () -> Void
}

extension AnyPublisher {
    static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
        let subject = PassthroughSubject<Output, Failure>()
        var disposable: Disposable?
        return subject
            .handleEvents(receiveSubscription: { subscription in
                disposable = subscribe(AnyObserver(
                    onNext: { output in subject.send(output) },
                    onError: { failure in subject.send(completion: .failure(failure)) },
                    onComplete: { subject.send(completion: .finished) }
                ))
            }, receiveCancel: { disposable?.dispose() })
            .eraseToAnyPublisher()
    }
}

下面是它在使用中的样子:

func loadWidgets() -> AnyPublisher<[Widget], Error> {
    AnyPublisher.create { observer in
        let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
          observer.onNext(widgets)
          observer.onComplete()
        }, error: { error in
          observer.onError(error)
        })
        return Disposable {
          loadTask.cancel()
        }
    }
}
93ze6v8z

93ze6v8z2#

据我所知,在Xcode 11 beta 3中已经放弃了对使用闭包初始化AnyPublisher的支持。在这种情况下,这将是Rx的Observable.create的相应解决方案,但现在我相信,如果您只需要传播单个值,Future是一个后藤解决方案。在其他情况下,我会返回一个PassthroughSubject并以这种方式传播多个值,但它不允许您在观察开始时启动一个任务,我相信与Observable.create相比,它远非理想。
在取消方面,它没有类似于DisposableisDisposed属性,因此无法直接检查它的状态并停止执行自己的任务。我现在能想到的唯一方法是观察cancel事件,但它肯定没有Disposable那么舒服。此外,我假设cancel实际上可能会停止来自URLSession的网络请求等任务,这是基于以下文档:https://developer.apple.com/documentation/combine/cancellable

ivqmmu1c

ivqmmu1c3#

在闭包外部添加isCancelled操作,并在future的闭包中检查它。isCancelled可以使用handleEvent()操作符进行切换。

var isCancelled = false
    func loadWidgets() -> AnyPublisher<[Widget], Error> {
    return HandleEvents<Future<Any, Error>> { resolve in
        // start the request when someone subscribes
        let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
            // publish result on success
            resolve(.success(widgets))
        }, error: { error in
            // publish error on failure
            resolve(.failure(error))   
        }
        if isCancelled {
            loadTask.cancel()
        }  
        ).handleEvents(receiveCancel: {
        isCancelled = true
        })
    }
}

在应用程序中的某个地方,您可以执行此操作以取消活动

loadWidgets().cancel()

也检查此article

kb5ga3dv

kb5ga3dv4#

感谢ccwasden提供的灵感。这使用纯合并实现复制了Observable.create语义,没有任何多余的实体。

AnyPublisher.create()Swift 5.6扩展

public extension AnyPublisher {
    
    static func create<Output, Failure>(_ subscribe: @escaping (AnySubscriber<Output, Failure>) -> AnyCancellable) -> AnyPublisher<Output, Failure> {
        
        let passthroughSubject = PassthroughSubject<Output, Failure>()
        var cancellable: AnyCancellable?
        
        return passthroughSubject
            .handleEvents(receiveSubscription: { subscription in

                let subscriber = AnySubscriber<Output, Failure> { subscription in

                } receiveValue: { input in
                    passthroughSubject.send(input)
                    return .unlimited
                } receiveCompletion: { completion in
                    passthroughSubject.send(completion: completion)
                }
                
                cancellable = subscribe(subscriber)
                
            }, receiveCompletion: { completion in
                
            }, receiveCancel: {
                cancellable?.cancel()
            })
            .eraseToAnyPublisher()
        
    }
    
}

用法

func doSomething() -> AnyPublisher<Int, Error> {
    
    return AnyPublisher<Int, Error>.create { subscriber in
        
        // Imperative implementation of doing something can call subscriber as follows
        _ = subscriber.receive(1)
        subscriber.receive(completion: .finished)
        // subscriber.receive(completion: .failure(myError))
        
        return AnyCancellable {
            // Imperative cancellation implementation
        }
    }
    
}
tuwxkamq

tuwxkamq5#

下面是一个解决方案,它可以用公认的解决方案解决问题:

extension AnyPublisher {
    struct Subscriber {
        fileprivate let send: (Output) -> Void
        fileprivate let complete: (Subscribers.Completion<Failure>) -> Void

        func send(_ value: Output) { self.send(value) }
        func send(completion: Subscribers.Completion<Failure>) { self.complete(completion) }
    }

    init(queue: DispatchQueue = .main, _ closure: @escaping (Subscriber) -> AnyCancellable) {
        self = Deferred {
            let subject = PassthroughSubject<Output, Failure>()
            var cancellable: AnyCancellable?

            return subject
                .handleEvents(
                    receiveCancel: { cancellable?.cancel() },
                    receiveRequest: { demand in
                        precondition(demand == .unlimited, "AnyPublisher.init only works with unlimited demand")
                        queue.async {
                            cancellable = closure(Subscriber(send: subject.send(_:), complete: subject.send(completion:)))
                        }
                    }
                )
        }
        .eraseToAnyPublisher()
    }
}

相关问题