了解Swift中的任务和任务组取消

iyfjxgzm  于 2023-08-02  发布在  Swift
关注(0)|答案(1)|浏览(180)

在Swift并发中,当任务被取消时,子任务和任务组也被隐式取消,according to Apple's documentationWWDC talks。但是,当我在TaskGroup中实现工作并取消其父任务时,我无法让它识别它已被取消。
下面是一个简短的例子来说明这个问题。
函数start启动一个Task,它启动一个TaskGroup,并每2秒向其中添加一个任务。两秒钟后,cancel()在Task上被调用。我希望有一两个结果值被标记为未完成。然而,即使在取消之后,他们都被标记为完成,这意味着他们没有尊重addTaskUnlessCancelled

public struct Thing {
    let completed: Bool
    let value: Int?
}

public struct TaskGroupCancellation {
    public mutating func start() async {
        let task = Task {
            let things = await withTaskGroup(of: Thing.self, returning: [Thing].self, body: { group in
                print("withTaskGroup start")
                var t = [Thing]()
                for i in 0..<3 {
                    let isCancelled = group.isCancelled
                    let added = group.addTaskUnlessCancelled {
                        print("Adding value \(i) (canceled?: \(isCancelled))")
                        return Thing(completed: true, value: i)
                    }
                    if !added {
                        print("Marking value \(i) as cancelled")
                        t.append(Thing(completed: false, value: nil))
                    }
                    try? await Task.sleep(for: .seconds(2))
                }

                for await thing in group {
                    t.append(thing)
                }
                print("withTaskGroup end")

                return t
            })

            print("Things: \(things)")
        }

        try? await Task.sleep(for: .seconds(2))
        task.cancel()
        print("Cancelled")
    }
}

字符串
运行上面的命令会产生以下输出:

withTaskGroup start
Adding value 0 (canceled?: false)
Cancelled
Adding value 2 (canceled?: false)
Adding value 1 (canceled?: false)
withTaskGroup end
Things: [TaskGroupCancellation.Thing(completed: true, value: Optional(0)), TaskGroupCancellation.Thing(completed: true, value: Optional(2)), TaskGroupCancellation.Thing(completed: true, value: Optional(1))]


我希望任务组的addTaskUnlessCancelled在取消后不会被输入,但它似乎仍然是。我理解错了吗?

3hvapo4f

3hvapo4f1#

我遇到了同样的TaskGroup行为。
TaskGroupdocumentation for isCancelled说道:
如果取消了当前正在运行该组的任务,则该组也会隐式取消,这也反映在此属性的值中。
关于更广泛的任务取消将“反映在这个属性的价值”的警告并不是我的经验。我发现,如果父任务被取消,组也被取消,但是group.isCancelled值和addTaskUnlessCancelled方法都没有反映这一点。
为了解决这个问题,您可以使用try Task.checkCancellation()(或检查Task.isCancelled),因为这显然正确地表示了该任务是否已被取消。
或者,通常不需要做任何事情,只需要让取消的自动传播来处理一切。例如,考虑:

import os.log

let poi = OSLog(subsystem: "Test", category: .pointsOfInterest)

func experimentCancelingParentTask() async throws {
    let task = Task {
        await withThrowingTaskGroup(of: Void.self) { group in
            let id = OSSignpostID(log: poi)
            os_signpost(.begin, log: poi, name: "group", signpostID: id)

            defer {
                os_signpost(.end, log: poi, name: "group", signpostID: id, "group.isCancelled = %s; Task.isCancelled = %s",  group.isCancelled.description, Task.isCancelled.description)
            }

            for await value in tickSequence() {
                os_signpost(.event, log: poi, name: "tick", "%d", value)
                group.addTask { try await performTask(with: value) }
            }
        }
    }

    try await Task.sleep(for: .seconds(4.5))
    os_signpost(.event, log: poi, name: "cancel")
    task.cancel()
}

字符串
如果我使用“兴趣点”工具在仪器中对此进行分析,我将看到:


的数据
请注意,group.isCancelledfalse,而Task.isCancelledtrue。但是,无论如何,当任务组被取消时(最后一个路标),正在进行的任务也被取消,甚至这个任务组正在迭代的AsyncSequence也被取消。
以上所述,值得注意的是,group.cancelAll()导致由group.isCancelledaddTaskUnlessCancelled反映的群抵消状态。我知道group.cancelAll()不是当前的问题,但我想指出的是,这个组取消逻辑基本上是可以工作的,只是没有反映父任务的取消状态。
但如上所示,无论如何,这通常是不需要的。
这不是非常相关,但这里有上面代码片段使用的一些函数:

// just a placeholder function to perform a task (and log stuff for “Points of Interest”); 
// the only salient detail is that this supports cancelation

func performTask(with value: Int) async throws {
    let id = OSSignpostID(log: poi)
    os_signpost(.begin, log: poi, name: "task", signpostID: id, "%d", value)

    defer {
        os_signpost(.end, log: poi, name: "task", signpostID: id, "%d", value)
    }

    try await Task.sleep(for: .seconds(1.75))
}

// a sequence of 12 values yielded one per second

func tickSequence() -> AsyncStream<Int> {
    AsyncStream { continuation in
        let task = Task {
            for i in 0 ..< 12 {
                continuation.yield(i)
                try await Task.sleep(for: .seconds(1))
            }
            continuation.finish()
        }

        continuation.onTermination = { _ in
            task.cancel()
        }
    }
}

相关问题