swift 并发模式等效于运行一次,等待所有线程完成

new9mtju  于 2023-06-28  发布在  Swift
关注(0)|答案(1)|浏览(125)

在将遗留代码转换为新的并发框架时,我发现了一些使用以下模式的示例:

  • 线程调用函数时将闭包作为参数
  • 函数检查其他函数(“实际工作”)是否已经在运行。
  • 如果是,它将存储闭包,不做其他任何事情(因此“实际工作”函数不会并行运行多次)
  • 如果没有,它将示例化另一个执行一些实际工作的异步函数
  • 当“实际工作”函数完成时,它将执行运行时存储的所有回调。

一个非常简单的例子:

class Test {

    private static let serial = DispatchQueue(label: "myQueue")

    private var completions = [(Result<Void, Error>) -> Void]()

    private var isRunning = false

    func callMe(id: Int, completion: @escaping (Result<Void, Error>) -> Void) {

        Test.serial.sync {

            print("\(id) called")
            completions.append(completion) // Preserve completion

            if !isRunning {
                print("\(id) is going to start doing something useful")
                isRunning = true
                doSomeAsyncStuff()
            }
        }
    }

    private func doSomeAsyncStuff() {

        DispatchQueue.global().asyncAfter(deadline: .now() + 1.0) {

            print("finished doing something useful")

            Test.serial.sync {
                self.isRunning = false
                self.completions.forEach { completion in
                    completion(.success(()))
                }
                self.completions.removeAll()
            }
        }
    }
}

例如,如果我这样做:

let test = Test()

DispatchQueue.concurrentPerform(iterations: 3) { i in
    test.callMe(id: i) { _ in
        print("\(i) is done")
    }
}

我将得到预期的输出:

0 called
0 is going to start doing something useful
1 called
2 called
finished doing something useful
0 is done
1 is done
2 is done

也就是说,“实际工作”函数只执行一次,所有回调函数都在它完成后执行。
现在,尝试将其转换为新的并发性。一个简单的实现是:

actor NewTest {
    
    private var isRunning = false
    
    func callMe(id: Int) async throws {
        
        print("\(id) called")
        
        if !isRunning {
            print("\(id) is going to start doing something useful")
            isRunning = true
            try await doSomeAsyncStuff()
        }
    }
    
    private func doSomeAsyncStuff() async throws {
        
        try await Task.sleep(nanoseconds: 1_000_000_000)
        print("finished doing something useful")
        self.isRunning = false
    }
}

然而,这显然行不通:启动“实际工作”的第一个线程将暂停等待结果,为其他线程提供运行时间。因为他们没有等待任何事情,他们将过早地完成。示例:

let newTest = NewTest()

DispatchQueue.concurrentPerform(iterations: 3) { i in
    Task {
        try await newTest.callMe(id: i)
        print("\(i) is done")
    }
}

结果将是:

2 called
2 is going to start doing something useful
1 called
1 is done
0 called
0 is done
finished doing something useful
2 is done

显然不是我想要的。到目前为止,我只想到了“混合”的想法:

actor HybridTest {
    
    private var completions = [(Result<Void, Error>) -> Void]()
    
    private var isRunning = false
    
    func callMe(id: Int) async throws {
        
        try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) -> Void in

            callMe(id: id) { result in
                
                switch result {
                case .success:
                    continuation.resume()
                case .failure(let error):
                    continuation.resume(throwing: error)
                }
            }
        }
    }
    
    // Back to old code
    private func callMe(id: Int, completion: @escaping (Result<Void, Error>) -> Void) {
        
        print("\(id) called")
        completions.append(completion)
        
        if !isRunning {
            print("\(id) is going to start doing something useful")
            isRunning = true
            doSomeAsyncStuff()
        }
    }
    
    // Back to old code
    private func doSomeAsyncStuff() {

        DispatchQueue.global().asyncAfter(deadline: .now() + 1.0) {

            print("finished doing something useful")

            Task {
                await self.update(result: Result.success(()))
            }
        }
    }
    
    private func update(result: Result<Void, Error>) {
        isRunning = false
        completions.forEach { completion in
            completion(result)
        }
        completions.removeAll()
    }
}

但我想知道是否有更好的方法来实现同样的模式。先谢谢你了。

sqxo8psd

sqxo8psd1#

Swift并发中的字面翻译需要保存一个Task,随后的调用可能是await。这样就不需要isRunning状态变量(因为如果有一个Task,那么它正在运行)以及闭包数组(因为多个调用可以await相同的Task)。
例如,沿着如下的东西:

actor Test {
    var task: Task<Void, Never>?

    func callMe(id: Int) async {
        print("\(id) called")

        if let task {
            _ = await task.value
            return
        }

        task = Task {
            print("\(id) is going to start doing something useful")
            await doSomeAsyncStuff()
            task = nil
        }

        _ = await task?.value
    }

    func doSomeAsyncStuff() async {
        try? await Task.sleep(for: .seconds(1))
        print("finished doing something useful")
    }
}

然后:

func test() async {
    let test = Test()

    await withTaskGroup(of: Void.self) { group in
        for i in 0 ..< 3 {
            group.addTask {
                await test.callMe(id: i)
                print("\(i) is done")
            }
        }
    }
}

产生:

0 called
1 called
2 called
0 is going to start doing something useful
finished doing something useful
2 is done
0 is done
1 is done

顺便说一句,我使用了一个任务组来代替concurrentPerform,使用了一个Task.sleep(这是一个非阻塞的“睡眠”)来代替dispatchAfter。最好是让所有的GCD API退出,并保持在Swift并发的范围内。
为了将来的读者,也许更优雅的解决方案是AsyncSequence,特别是AsyncStream.bufferingOldestbufferingPolicy

actor TestWithSequence {
    func iterate() async {
        for await value in sequence() {
            print("\(value) is going to start doing something useful")
            await doSomeAsyncStuff()
        }
    }

    private var block: ((Int) -> Void)?

    func sequence() -> AsyncStream<Int> {
        AsyncStream(bufferingPolicy: .bufferingOldest(1)) { contination in
            block = { value in
                contination.yield(value)
            }
            contination.onTermination = { _ in
                Task { await self.reset() }
            }
        }
    }

    func callMe(id: Int) async {
        print("\(id) called")
        block?(id)
    }
}

private extension TestWithSequence {
    func reset() {
        block = nil
    }

    func doSomeAsyncStuff() async {
        try? await Task.sleep(for: .seconds(1))
        print("finished doing something useful")
    }
}

现在,这并没有解决OP希望后续调用等待较早的doSomeAsyncStuff的问题,但这确实使您处于结构化并发的世界中(因此取消会自动传播)。
或者,您可以使用Swift Async Algorithms包中的AsyncChannel,以及.bufferingOldest.buffer(policy:),这进一步简化了这一点,但仍然保留了结构化并发:

actor TestWithChannel {
    private let channel = AsyncChannel<Int>()

    func iterate() async {
        for await value in channel.buffer(policy: .bufferingOldest(1)) {
            await doSomeAsyncStuff(value)
        }
    }

    func callMe(id: Int) async {
        print("\(id) called")
        await channel.send(id)
        print("\(id) finished")
    }
}

private extension TestWithChannel {
    func doSomeAsyncStuff(_ value: Int) async {
        try? await Task.sleep(for: .seconds(1))
        print("finished doing something useful with \(value)")
    }
}

使用AsyncChannel,调用者将awaitfor-async-await循环来获取发送的值,但它不会awaitdoSomeAsyncStuff。因此,这并不能满足OP对await的要求,但它是我的“go to”模式,用于异步序列的缓冲处理。

相关问题