Swift合并:在超时的情况下逐个运行publisher

ffvjumwh  于 2023-05-16  发布在  Swift
关注(0)|答案(1)|浏览(131)

bounty已结束。回答此问题可获得+50声望奖励。赏金宽限期4小时后结束。Daniel希望引起更多关注这个问题。

我想在发布第一个发布服务器后运行第二个发布服务器,但要设置超时。
就像这样:

publisher1.flatMap{ _ in
  publisher2.timeout(1, scheduler: RunLoop.main)
}
.handleEvents(receiveCancel: {

})
.sink { _ in

} receiveValue: { success in

}

(Note:publisher2是一个发布者,它在接收到一个套接字事件时发布,这并不能保证一直运行,所以我想在那里设置一个超时,这样即使有错误也可以完成上面的操作。
我该怎么做?

enxuqcxy

enxuqcxy1#

下面是一个Playground的源代码,它演示了我认为您正在尝试实现的功能,并有一些代码来练习该功能。
在代码中,publisher1publisher2PassthroughSubjects,这是一个简单的发布器,在示例中使用非常方便,因为您可以在任何时候通过它们发送值。
subscribeToPipelines中,代码构建了一个新的管道withTimeout,它从pipeline2获取输入,如果在1秒内没有发布任何内容,则会超时。当pipeline1通过flatMap接收值时,该管道将进行控制
withTimeout的结果由sink运算符捕获。代码处理withTimeout的可能条件,它可能已经正常完成,它可能已经超时,并且它可能接收一个值。print语句涵盖了所有情况。
代码使用Task来执行管道,因为Task.sleep是一种简单的延迟方式。第一个练习展示了如果publisher1接收到一个值,而publisher2在半秒后接收到一个值,会发生什么。第二个练习展示了如果publisher2延迟发布值1.5秒(超时)会发生什么。

import UIKit
import Combine

enum MyErrors: Error {
    case publisher2Timedout
}

let publisher1 = PassthroughSubject<Int, Error>()
let publisher2 = PassthroughSubject<Int, Error>()

func subscribeToPipelines() -> AnyCancellable {
    let withTimeout = publisher2.timeout(1,
                                         scheduler: RunLoop.main,
                                         customError: { return MyErrors.publisher2Timedout })

    let subscription = publisher1.flatMap { value1 in
        print("The first publisher's result was \(value1)")
        return withTimeout
    }
        .sink { completion in
            switch completion {
                case .finished:
                    print("The entire pipeline finished")
                case .failure(let error):
                    if case MyErrors.publisher2Timedout = error {
                        print("The second operation timed out")
                    }
            }
        } receiveValue: { value2 in
            print("The second publisher's result is \(value2)")
        }

    return subscription
}

Task {
    var subscription: AnyCancellable? =  subscribeToPipelines()
    // Try a case where the operations succeed
    publisher1.send(3)
    try await Task.sleep(for: .milliseconds(500))
    publisher2.send(4)

    subscription = nil;

    try await Task.sleep(for: .milliseconds(500))

    subscription =  subscribeToPipelines()
    publisher1.send(5)
    try  await Task.sleep(for: .milliseconds(1500))
    publisher2.send(6)

    subscription = nil;
}

相关问题