在将遗留代码转换为新的并发框架时,我发现了一些使用以下模式的示例:
- 线程调用函数时将闭包作为参数
- 函数检查其他函数(“实际工作”)是否已经在运行。
- 如果是,它将存储闭包,不做其他任何事情(因此“实际工作”函数不会并行运行多次)
- 如果没有,它将示例化另一个执行一些实际工作的异步函数
- 当“实际工作”函数完成时,它将执行运行时存储的所有回调。
一个非常简单的例子:
class Test {
private static let serial = DispatchQueue(label: "myQueue")
private var completions = [(Result<Void, Error>) -> Void]()
private var isRunning = false
func callMe(id: Int, completion: @escaping (Result<Void, Error>) -> Void) {
Test.serial.sync {
print("\(id) called")
completions.append(completion) // Preserve completion
if !isRunning {
print("\(id) is going to start doing something useful")
isRunning = true
doSomeAsyncStuff()
}
}
}
private func doSomeAsyncStuff() {
DispatchQueue.global().asyncAfter(deadline: .now() + 1.0) {
print("finished doing something useful")
Test.serial.sync {
self.isRunning = false
self.completions.forEach { completion in
completion(.success(()))
}
self.completions.removeAll()
}
}
}
}
例如,如果我这样做:
let test = Test()
DispatchQueue.concurrentPerform(iterations: 3) { i in
test.callMe(id: i) { _ in
print("\(i) is done")
}
}
我将得到预期的输出:
0 called
0 is going to start doing something useful
1 called
2 called
finished doing something useful
0 is done
1 is done
2 is done
也就是说,“实际工作”函数只执行一次,所有回调函数都在它完成后执行。
现在,尝试将其转换为新的并发性。一个简单的实现是:
actor NewTest {
private var isRunning = false
func callMe(id: Int) async throws {
print("\(id) called")
if !isRunning {
print("\(id) is going to start doing something useful")
isRunning = true
try await doSomeAsyncStuff()
}
}
private func doSomeAsyncStuff() async throws {
try await Task.sleep(nanoseconds: 1_000_000_000)
print("finished doing something useful")
self.isRunning = false
}
}
然而,这显然行不通:启动“实际工作”的第一个线程将暂停等待结果,为其他线程提供运行时间。因为他们没有等待任何事情,他们将过早地完成。示例:
let newTest = NewTest()
DispatchQueue.concurrentPerform(iterations: 3) { i in
Task {
try await newTest.callMe(id: i)
print("\(i) is done")
}
}
结果将是:
2 called
2 is going to start doing something useful
1 called
1 is done
0 called
0 is done
finished doing something useful
2 is done
显然不是我想要的。到目前为止,我只想到了“混合”的想法:
actor HybridTest {
private var completions = [(Result<Void, Error>) -> Void]()
private var isRunning = false
func callMe(id: Int) async throws {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) -> Void in
callMe(id: id) { result in
switch result {
case .success:
continuation.resume()
case .failure(let error):
continuation.resume(throwing: error)
}
}
}
}
// Back to old code
private func callMe(id: Int, completion: @escaping (Result<Void, Error>) -> Void) {
print("\(id) called")
completions.append(completion)
if !isRunning {
print("\(id) is going to start doing something useful")
isRunning = true
doSomeAsyncStuff()
}
}
// Back to old code
private func doSomeAsyncStuff() {
DispatchQueue.global().asyncAfter(deadline: .now() + 1.0) {
print("finished doing something useful")
Task {
await self.update(result: Result.success(()))
}
}
}
private func update(result: Result<Void, Error>) {
isRunning = false
completions.forEach { completion in
completion(result)
}
completions.removeAll()
}
}
但我想知道是否有更好的方法来实现同样的模式。先谢谢你了。
1条答案
按热度按时间sqxo8psd1#
Swift并发中的字面翻译需要保存一个
Task
,随后的调用可能是await
。这样就不需要isRunning
状态变量(因为如果有一个Task
,那么它正在运行)以及闭包数组(因为多个调用可以await
相同的Task
)。例如,沿着如下的东西:
然后:
产生:
顺便说一句,我使用了一个任务组来代替
concurrentPerform
,使用了一个Task.sleep
(这是一个非阻塞的“睡眠”)来代替dispatchAfter
。最好是让所有的GCD API退出,并保持在Swift并发的范围内。为了将来的读者,也许更优雅的解决方案是
AsyncSequence
,特别是AsyncStream
和.bufferingOldest
的bufferingPolicy
:现在,这并没有解决OP希望后续调用等待较早的
doSomeAsyncStuff
的问题,但这确实使您处于结构化并发的世界中(因此取消会自动传播)。或者,您可以使用Swift Async Algorithms包中的
AsyncChannel
,以及.bufferingOldest
的.buffer(policy:)
,这进一步简化了这一点,但仍然保留了结构化并发:使用
AsyncChannel
,调用者将await
for
-async
-await
循环来获取发送的值,但它不会await
doSomeAsyncStuff
。因此,这并不能满足OP对await
的要求,但它是我的“go to”模式,用于异步序列的缓冲处理。