如何让一个异步Swift函数“@synchronized”?

plicqrtu  于 2023-05-16  发布在  Swift
关注(0)|答案(2)|浏览(221)

我想创建一个异步函数,它本身使用异步调用。我还希望确保在任何时刻都只处理一个呼叫。所以我想要一个async@synchronized函数。
如何做到这一点?将函数体 Package 在dispatchQueue.sync {}中不起作用,因为它需要同步代码。另外,似乎DispatchQueue一般不是设计来执行异步代码块/任务的。
这段代码与硬件通信,所以本质上是异步的,这就是为什么我想为我的库提供一个异步接口。(我不想在通信阶段阻止应用程序。)但是某些操作不能在硬件上并行执行,所以我必须通过同步,这样某些操作就不会同时发生。

pvabu6sv

pvabu6sv1#

我最初的回答(我建议等待前面的任务)如下。这是一个简单的模式,工作良好,但非结构化的并发(复杂的取消工作流)。
现在,我会使用AsyncSequence,例如AsyncStream。WWDC 2021视频Meet AsyncSequence或者,对于类似于队列的行为(我们首先设置队列,然后再向其中添加项),我通常会从Swift Async Algorithms包中获取AsyncChannel。WWDC 2022视频Meet Swift Async Algorithms
例如,我可以为我想要下载的URL创建一个AsyncChannel

let urls = AsyncChannel<URL>()

现在我有了一个通道,我可以设置一个任务来用for-await-in循环串行处理它们:

func processUrls() async {
    for await url in urls {
        await download(url)
    }
}

而且,当我稍后想要向该通道添加要处理的内容时,我可以将send添加到该通道:

func append(_ url: URL) async {
    await urls.send(url)
}

您可以让每个Task等待前一个。您可以使用actor确保一次只运行一个。诀窍在于,由于参与者可重入性,您必须将“await prior Task”逻辑放在同步方法中。
例如,您可以执行以下操作:

actor Experiment {
    private var previousTask: Task<Void, Error>?

    func startSomethingAsynchronous() {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result
            try await self.doSomethingAsynchronous()
        }
    }

    private func doSomethingAsynchronous() async throws {
        let id = OSSignpostID(log: log)
        os_signpost(.begin, log: log, name: "Task", signpostID: id, "Start")
        try await Task.sleep(nanoseconds: 2 * NSEC_PER_SEC)
        os_signpost(.end, log: log, name: "Task", signpostID: id, "End")
    }
}

现在我使用的是os_signpost,所以我可以从Xcode Instruments中看到这种串行行为。无论如何,你可以像这样开始三个任务:

import os.log

private let log = OSLog(subsystem: "Experiment", category: .pointsOfInterest)

class ViewController: NSViewController {

    let experiment = Experiment()

    func startExperiment() {
        for _ in 0 ..< 3 {
            Task { await experiment.startSomethingAsynchronous() }
        }
        os_signpost(.event, log: log, name: "Done starting tasks")
    }

    ... 
}

Instruments可以直观地演示顺序行为(其中向我们显示所有任务的提交完成位置),但您可以在时间轴上看到任务的顺序执行:

我实际上喜欢把这种串行行为抽象成它自己的类型:

actor SerialTasks<Success> {
    private var previousTask: Task<Success, Error>?

    func add(block: @Sendable @escaping () async throws -> Success) {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result
            return try await block()
        }
    }
}

然后,需要此串行行为的异步函数将使用上述内容,例如:

class Experiment {
    let serialTasks = SerialTasks<Void>()

    func startSomethingAsynchronous() async {
        await serialTasks.add {
            try await self.doSomethingAsynchronous()
        }
    }

    private func doSomethingAsynchronous() async throws {
        let id = OSSignpostID(log: log)
        os_signpost(.begin, log: log, name: "Task", signpostID: id, "Start")
        try await Task.sleep(nanoseconds: 2 * NSEC_PER_SEC)
        os_signpost(.end, log: log, name: "Task", signpostID: id, "End")
    }
}
4c8rllxm

4c8rllxm2#

接受的答案确实会使任务串行运行,但它不会等待任务完成,也不会传播错误。我使用以下替代方法来支持捕获错误。

actor SerialTaskQueue<T> {
    private let dispatchQueue: DispatchQueue

    init(label: String) {
        dispatchQueue = DispatchQueue(label: label)
    }
    
    @discardableResult
    func add(block: @Sendable @escaping () async throws -> T) async throws -> T {
        try await withCheckedThrowingContinuation { continuation in
            dispatchQueue.sync {
                let semaphore = DispatchSemaphore(value: 0)

                Task {
                    defer {
                        semaphore.signal()
                    }

                    do {
                        let result = try await block()

                        continuation.resume(returning: result)
                    } catch {
                        continuation.resume(throwing: error)
                    }
                }

                semaphore.wait()
            }
        }
    }
}

相关问题