@@ -27,29 +27,23 @@ extension Task: AnyTask {
2727}
2828
2929/// A type that is able to track dependencies between tasks.
30- package protocol DependencyTracker : Sendable {
31- /// Which tasks need to finish before a task described by `self` may start executing.
32- /// `pendingTasks` is sorted in the order in which the tasks were enqueued to `AsyncQueue`.
33- func dependencies( in pendingTasks: [ PendingTask < Self > ] ) -> [ PendingTask < Self > ]
30+ package protocol DependencyTracker : Sendable , Hashable {
31+ /// Whether the task described by `self` needs to finish executing before `other` can start executing.
32+ func isDependency( of other: Self ) -> Bool
3433}
3534
3635/// A dependency tracker where each task depends on every other, i.e. a serial
3736/// queue.
3837package struct Serial : DependencyTracker {
39- package func dependencies( in pendingTasks: [ PendingTask < Self > ] ) -> [ PendingTask < Self > ] {
40- if let lastTask = pendingTasks. last {
41- return [ lastTask]
42- }
43- return [ ]
38+ package func isDependency( of other: Serial ) -> Bool {
39+ return true
4440 }
4541}
4642
47- package struct PendingTask < TaskMetadata: Sendable > : Sendable {
43+ package struct PendingTask < TaskMetadata: Sendable & Hashable > : Sendable {
4844 /// The task that is pending.
4945 fileprivate let task : any AnyTask
5046
51- package let metadata : TaskMetadata
52-
5347 /// A unique value used to identify the task. This allows tasks to get
5448 /// removed from `pendingTasks` again after they finished executing.
5549 fileprivate let id : UUID
@@ -58,23 +52,25 @@ package struct PendingTask<TaskMetadata: Sendable>: Sendable {
5852/// A list of pending tasks that can be sent across actor boundaries and is guarded by a lock.
5953///
6054/// - Note: Unchecked sendable because the tasks are being protected by a lock.
61- private class PendingTasks < TaskMetadata: Sendable > : @ unchecked Sendable {
55+ private final class PendingTasks < TaskMetadata: Sendable & Hashable > : Sendable {
6256 /// Lock guarding `pendingTasks`.
6357 private let lock = NSLock ( )
6458
6559 /// Pending tasks that have not finished execution yet.
6660 ///
6761 /// - Important: This must only be accessed while `lock` has been acquired.
68- private var tasks : [ PendingTask < TaskMetadata > ] = [ ]
62+ private nonisolated ( unsafe ) var tasksByMetadata : [ TaskMetadata : [ PendingTask < TaskMetadata > ] ] = [ : ]
6963
7064 init ( ) {
7165 self . lock. name = " AsyncQueue "
7266 }
7367
7468 /// Capture a lock and execute the closure, which may modify the pending tasks.
75- func withLock< T> ( _ body: ( _ pendingTasks: inout [ PendingTask < TaskMetadata > ] ) throws -> T ) rethrows -> T {
69+ func withLock< T> (
70+ _ body: ( _ tasksByMetadata: inout [ TaskMetadata : [ PendingTask < TaskMetadata > ] ] ) throws -> T
71+ ) rethrows -> T {
7672 try lock. withLock {
77- try body ( & tasks )
73+ try body ( & tasksByMetadata )
7874 }
7975 }
8076}
@@ -122,10 +118,29 @@ package final class AsyncQueue<TaskMetadata: DependencyTracker>: Sendable {
122118 ) -> Task < Success , any Error > {
123119 let id = UUID ( )
124120
125- return pendingTasks. withLock { tasks in
121+ return pendingTasks. withLock { tasksByMetadata in
126122 // Build the list of tasks that need to finished execution before this one
127123 // can be executed
128- let dependencies = metadata. dependencies ( in: tasks)
124+ var dependencies : [ PendingTask < TaskMetadata > ] = [ ]
125+ for (pendingMetadata, pendingTasks) in tasksByMetadata {
126+ guard pendingMetadata. isDependency ( of: metadata) else {
127+ // No dependency
128+ continue
129+ }
130+ if metadata. isDependency ( of: metadata) , let lastPendingTask = pendingTasks. last {
131+ // This kind of task depends on all other tasks of the same kind finishing. It is sufficient to just wait on
132+ // the last task with this metadata, it will have all the other tasks with the same metadata as transitive
133+ // dependencies.
134+ dependencies. append ( lastPendingTask)
135+ } else {
136+ // We depend on tasks with this metadata, but they don't have any dependencies between them, eg.
137+ // `documentUpdate` depends on all `documentRequest` but `documentRequest` don't have dependencies between
138+ // them. We need to depend on all of them unless we knew that we depended on some other task that already
139+ // depends on all of these. But determining that would also require knowledge about the entire dependency
140+ // graph, which is likely as expensive as depending on all of these tasks.
141+ dependencies += pendingTasks
142+ }
143+ }
129144
130145 // Schedule the task.
131146 let task = Task ( priority: priority) { [ pendingTasks] in
@@ -139,14 +154,17 @@ package final class AsyncQueue<TaskMetadata: DependencyTracker>: Sendable {
139154
140155 let result = try await operation ( )
141156
142- pendingTasks. withLock { tasks in
143- tasks. removeAll ( where: { $0. id == id } )
157+ pendingTasks. withLock { tasksByMetadata in
158+ tasksByMetadata [ metadata, default: [ ] ] . removeAll ( where: { $0. id == id } )
159+ if tasksByMetadata [ metadata] ? . isEmpty ?? false {
160+ tasksByMetadata [ metadata] = nil
161+ }
144162 }
145163
146164 return result
147165 }
148166
149- tasks . append ( PendingTask ( task: task, metadata : metadata , id: id) )
167+ tasksByMetadata [ metadata , default : [ ] ] . append ( PendingTask ( task: task, id: id) )
150168
151169 return task
152170 }
0 commit comments