swift 并发执行闭包中的并行性

fquxozlt  于 2022-12-02  发布在  Swift
关注(0)|答案(2)|浏览(129)

我希望在我的应用程序中实现并发,以加快处理速度。输入数组可以是一个大数组,我需要检查与它相关的多个东西。这将是一些示例代码。
编辑:这对于查看在数组中大步前进很有帮助,这是我正在考虑做的另一件事,但我认为有帮助的答案正在远离原始问题,因为代码中已经存在一个DispatchQueue.concurrentPerform
在一个for循环中,我多次尝试实现其他for循环,因为必须多次重新查看相同的数据。inputArray是一个结构体数组,所以在外部循环中,我查看结构体中的一个值,然后在内部循环中,我看到了结构中的一个不同的值。在下面的修改中,我做了两个内部for循环函数调用,以使代码更清晰。但一般来说,我会做两个funcAfuncB调用,并且等待直到它们都完成,然后继续主循环。

//assume the startValues and stop values will be within the bounds of the

//array and wont under/overflow
private func funcA(inputArray: [Int], startValue: Int, endValue: Int) -> Bool{
    for index in startValue...endValue {
        let dataValue = inputArray[index]
        
        if dataValue == 1_000_000 {
            return true
        }
    }
    return false
}

private func funcB(inputArray: [Int], startValue: Int, endValue: Int) -> Bool{
    for index in startValue...endValue {
        let dataValue = inputArray[index]
        
        if dataValue == 10 {
            return true
        }
    }
    return false
}

private func testFunc(inputArray: [Int]) {
    let dataIterationArray = Array(Set(inputArray))
    let syncQueue = DispatchQueue(label: "syncQueue")
    DispatchQueue.concurrentPerform(iterations: dataIterationArray.count) { index in
                
        //I want to do these two function calls starting roughly one after another,
        //to work them in parallel, but i want to wait until both are complete before
        //moving on. funcA is going to take much longer than funcB in this case,
        //just because there are more values to check.
        let funcAResult = funcA(inputArray: dataIterationArray, startValue: 10, endValue: 2_000_000)
        let funcBResult = funcB(inputArray: dataIterationArray, startValue: 5, endValue: 9)
        //Wait for both above to finish before continuing
        
        if funcAResult && funcBResult {
            print("Yup we are good!")
        } else {
            print("Nope")
        }
        
        //And then wait here until all of the loops are done before processing
    }
}
kcwpcxri

kcwpcxri1#

In your revised question, you contemplated a concurrentPerform loop where each iteration called funcA and then funcB and suggested that you wanted them “to work them in parallel”.
Unfortunately. that is not how concurrentPerform works. It runs the separate iterations in parallel, but the code within the closure should be synchronous and run sequentially. If the closure introduces additional parallelism, that will adversely affect how the concurrentPerform can reason about how many worker threads it should use.
Before we consider some alternatives, let us see what will happen if funcA and funcB remain synchronous. In short, you will still enjoy parallel execution benefits.
Below, I logged this with “Points of Interest” intervals in Instruments, and you will see that funcA (in green) never runs concurrently with funcB (in purple) for the same iteration (i.e., for the same range of start and end indices). In this example, I am processing an array with 180 items, striding 10 items at a time, ending up with 18 iterations running on an iPhone 12 Pro Max with six cores:

But, as you can see, although funcB for a given range of indices will not start until funcA finishes for the same range of indices, it does not really matter, because we are still enjoying full parallelism on the device, taking advantage of all the CPU cores.
I contend that, given that we are enjoying parallelism, that there is little benefit to contemplate making funcA and funcB run concurrently with respect to each other, too. Just let the individual iterations run parallel to each other, but let A and B run sequentially, and call it a day.
If you really want to have funcA and funcB run parallel with each other, as well, you will need to consider a different pattern. The concurrentPerform simply is not designed for launching parallel tasks that, themselves, are asynchronous. You could consider:

  • Have concurrentPerform launch, using my example, 36 iterations, half of which do funcA and half of which do funcB .
  • Or you might consider using OperationQueue with a reasonable maxConcurrentOperationCount (but you do not enjoy the dynamic limitation of the degree concurrency to the device’s CPU cores).
  • Or you might use an async - await task group, which will limit itself to the cooperative thread pool.

But you will not want to have concurrentPerform have a closure that launches asynchronous tasks or introduces additional parallel execution.
And, as I discuss below, the example provided in the question is not a good candidate for parallel execution. Mere tests of equality are not computationally intensive enough to enjoy parallelism benefits. It will undoubtedly just be slower than the serial pattern.
My original answer, below, outlines the basic concurrentPerform considerations.
The basic idea is to “stride” through the values. So calculate how many “iterations” are needed and calculate the “start” and “end” index for each iteration:

private func testFunc(inputArray: [Int]) {
    DispatchQueue.global().async {
        let array = Array(Set(inputArray))
        let syncQueue = DispatchQueue(label: "syncQueue")

        // calculate how many iterations will be needed

        let count = array.count
        let stride = 10
        let (quotient, remainder) = count.quotientAndRemainder(dividingBy: stride)
        let iterations = remainder == 0 ? quotient : quotient + 1

        // now iterate

        DispatchQueue.concurrentPerform(iterations: iterations) { iteration in

            // calculate the `start` and `end` indices

            let start = stride * iteration
            let end = min(start + stride, count)

            // now loop through that range

            for index in start ..< end {
                let value = array[index]
                print("iteration =", iteration, "index =", index, "value =", value)
            }
        }

        // you won't get here until they're all done; obviously, if you 
        // want to now update your UI or model, you may want to dispatch
        // back to the main queue, e.g.,
        //
        // DispatchQueue.main.async { 
        //     ...
        // }
    }
}

Note, if something is so slow that it merits concurrentPerform , you probably want to dispatch the whole thing to a background queue, too. Hence the DispatchQueue.global().async {…} shown above. You would probably want to add a completion handler to this method, now that it runs asynchronously, but I will leave that to the reader.
Needless to say, there are quite a few additional considerations:

  • The stride should be large enough to ensure there is enough work on each iteration to offset the modest overhead introduced by multithreading. Some experimentation is often required to empirically determine the best striding value.
  • The work done in each thread must be significant (again, to justify the multithreading overhead). I.e., simply printing values is obviously not enough. (Worse, print statements compound the problem by introducing a hidden synchronization.) Even building a new array with some simple calculation will not be sufficient. This pattern really only works if you are doing something very computationally intensive.
  • You have a “sync” queue, which suggests that you understand that you need to synchronize the combination of the results of the various iterations. That is good. I will point out, though, that you will want to minimize the total number of synchronizations you do. E.g. let’s say you have 1000 values and you end up doing 10 iterations, each striding through 100 values. You generally want to have each iteration build a local result and do a single synchronization for each iteration. Using my example, you should strive to end up with only 10 total synchronizations, not 1000 of them, otherwise excessive synchronization can easily negate any performance gains.

总而言之,并行执行一个例程是很复杂的,而且你很容易发现这个过程实际上比串行转换要慢。有些进程根本不适合并行执行。显然,在不了解你的进程需要什么的情况下,我们不能进一步评论。有时候,其他技术,比如AccelerateMetal可以获得更好的结果。

kqqjbcuj

kqqjbcuj2#

我将在这里解释它,因为评论太小,但将删除后,如果它不回答这个问题。
不要在iterations: dataIterationArray.count上循环,而是让迭代次数基于所需的并行工作流的数量,而不是基于数组大小。例如,正如你提到的,你想要有3个工作流,那么你应该有3个迭代,每个迭代处理独立的工作部分:

DispatchQueue.concurrentPerform(iterations: 3) { iteration in 
    switch iteration {
    case 0:
        for i in 1...10{
            print ("i \(i)")
        }
    case 1:
        for j in 11...20{
            print ("j \(j)")
        }
    case 2:        
        for k in 21...30{
            print ("k \(k)")
        }
    }
}

并且“然后在处理之前在这里等待,直到所有循环都完成”将自动发生,这是concurrentPerform所保证的。

相关问题