Swift并发任务与调度队列线程:什么决定了有多少任务同时运行?

jv4diomz  于 2023-04-19  发布在  Swift
关注(0)|答案(1)|浏览(169)

为了这个雨燕Playground(view on swiftfiddle或更低版本),我有三个进程密集型工作循环。(通过最后报告的循环数量的增加来显示),并且具有并发性的那些将只会同时执行两个任务。是什么决定了有多少线程在其中运行?正如我在评论中提到的,一种方法不一定比另一种方法快,因为50个线程竞争资源远非理想,但2似乎是一个非常低的数字。

import PlaygroundSupport
import UIKit

PlaygroundPage.current.needsIndefiniteExecution = true

func asyncProcess(_ this:Int) async{
    print("   async -- starting \(this)")
    let x = Int.random(in: 1000...10000000)
    await withUnsafeContinuation{
        for _ in 1..<x {}
        $0.resume()
    }
    print("   async -- ending \(this) after \(x) times")
}

func threadedProcess(_ this:Int) {
    print("threaded -- starting \(this) on \(Thread.current)")
    let x = Int.random(in: 1000...10000000)
    for _ in 1..<x {}
    print("threaded -- ending \(this) after \(x) times on \(Thread.current)")
}

/// This will dispatch all 50 at once (if running w/out other tasks), and they all run together (which you can see because the ending statements print in ascending order of the number of times the loop runs
func doThreaded(){
    for x in 1...50 {
        DispatchQueue.global(qos: .background).async { threadedProcess(x) }
    }
}

/// This only ever runs two tasks at the same time
func doUngroupedTasks(){
    print ("starting ungrouped tasks")
    for x in 1...50 {
        Task.detached { await asyncProcess(x) }
    }
    print ("ending ungrouped tasks")
}

/// This is no different than the above
func doGroupedTasks() async{
    print ("starting grouped tasks")
    await withTaskGroup(of: Void.self){
        for x in 51...100 {
            $0.addTask(priority: .background) {
                await asyncProcess(x)
            }
        }
    }
    print ("ending grouped tasks")
}

// comment out here as you see fit
doThreaded()
doUngroupedTasks()
await doGroupedTasks()

vjhs03f7

vjhs03f71#

在Xcode 14.2或更早版本中,Swift并发在模拟器上的“协作线程池”是受限制的。(参见Maximum number of threads with async-await task groups。)但是,如果您在物理设备上运行它,您将在GCD的concurrentPerform中看到与Swift并发任务组相当的并行度。
关于这三个备选方案:
1.在doThreaded中,使用DispatchQueue,您将调度50个项目,这将占用非常有限的工作线程池中的50个线程(上次我检查时,每个QoS只有64个线程)。如果您完全耗尽工作线程池(称为“线程爆炸”),则可能导致各种问题。
我强烈建议你避免无限制地向并发队列分派任务(特别是如果这些任务可能很慢的话)。相反,我们会使用concurrentPerform来享受最大的并发性,同时避免线程爆炸:

func doThreaded() {
    DispatchQueue.global(qos: .utility).async {
        DispatchQueue.concurrentPerform(iterations: 50) { x in
            self.threadedProcess(#function, x)
        }
    }
}

这也享受了最大的并发性(甚至比直接的、无约束的异步调度到全局队列时还要多),但避免了线程爆炸。
无论哪种方式,GCD中的实际最大并发性都受限于CPU核心的能力。
1.在doUngroupedTasks中,您使用的是非结构化并发(并且不等待它们返回)。在它们实际完成之前,您可能会看到“结束未分组的任务”消息。
在这种情况下,最大并发量被限制为协作线程池的大小。在设备上,这将是CPU核心的数量。在模拟器上,在Xcode 14.2或更早版本中,这可能是artificially constrained。但在物理设备上,它使用所有CPU核心。
1.在doGroupedTasks中,您可以享受与前面提到的相同程度的并发(即,CPU的能力最大化,除了在14.3之前的Xcode版本中的模拟器上运行时)。但是任务组消除了前面的非结构化并发示例引入的所有问题(知道它们何时完成,享受取消传播等)。
其他一些观察:

  • 我会避免立即连续调用这些任务。doThreadeddoUngroupedTasks并没有等待异步任务完成,因此前面的实验会影响后面的实验。
  • 我会避免使用模拟器或Playground进行这些实验。它们会引入额外的约束和限制,这会扭曲你的结果。
  • 你的任务在Playground中可能会很慢(运行得非常慢),如果你把它移动到一个应用程序,特别是发布版本,这些循环太短了,无法可靠地观察并发性。就个人而言,对于这样的演示,我会循环一段时间,这样无论环境或硬件如何,我都有可重复的结果。
  • 你可能需要注意的是,你用来测试Swift并发性的方法不是actor-isolated的。作为顶级函数,它们不会是actor isolated的,但是如果你把它们放在一个actor-isolated类型中,你需要显式地将它标记为nonisolated。你可能不希望actor-isolation扭曲你的并行性实验。

FWIW,这里有一个“兴趣点”间隔的例子,我可以分析(命令-i或“产品”»“配置文件”)并使用仪器中的“时间分析器”模板进行观察:

import Cocoa
import os.log

private let poi = OSLog(subsystem: "Test", category: .pointsOfInterest)

class ViewController: NSViewController {

    nonisolated func threadedProcess(_ message: StaticString, _ this: Int) {
        let id = OSSignpostID(log: poi)
        os_signpost(.begin, log: poi, name: #function, signpostID: id, message)

        let start = CACurrentMediaTime()
        while CACurrentMediaTime() - start < 1 { }

        os_signpost(.end, log: poi, name: #function, signpostID: id)
    }

    let iterations = 50

    @IBAction func didTapThreaded(_ sender: Any) {
        DispatchQueue.global(qos: .utility).async {
            DispatchQueue.concurrentPerform(iterations: self.iterations) { x in
                self.threadedProcess(#function, x)
            }
        }
    }

    @IBAction func didTapUngrouped(_ sender: Any) {
        for x in 0 ..< iterations {
            Task.detached { self.threadedProcess(#function, x) }
        }
    }

    @IBAction func didTapGrouped(_ sender: Any) {
        Task {
            await withTaskGroup(of: Void.self) { group in
                for x in 0 ..< iterations {
                    group.addTask(priority: .background) {
                        self.threadedProcess(#function, x)
                    }
                }
            }
        }
    }
}

这是didTapThreadeddidTapUngroupeddidTapGrouped的结果。正如你所看到的,它们都享有同等的并发性,最大限度地利用了我的Mac硬件:

底线是,在测试这类行为时,避免使用Playground和模拟器。

相关问题