@@ -15,6 +15,7 @@ import Dispatch
1515import class Foundation. NSLock
1616import class Foundation. ProcessInfo
1717import struct Foundation. URL
18+ import struct Foundation. UUID
1819import func TSCBasic. tsc_await
1920
2021public enum Concurrency {
@@ -76,3 +77,203 @@ extension DispatchQueue {
7677 }
7778 }
7879}
80+
81+ /// A queue for running async operations with a limit on the number of concurrent tasks.
82+ public final class AsyncOperationQueue : @unchecked Sendable {
83+
84+ // This implementation is identical to the AsyncOperationQueue in swift-build.
85+ // Any modifications made here should also be made there.
86+ // https://github.com/swiftlang/swift-build/blob/main/Sources/SWBUtil/AsyncOperationQueue.swift#L13
87+
88+ fileprivate typealias ID = UUID
89+ fileprivate typealias WaitingContinuation = CheckedContinuation < Void , any Error >
90+
91+ private let concurrentTasks : Int
92+ private var waitingTasks : [ WorkTask ] = [ ]
93+ private let waitingTasksLock = NSLock ( )
94+
95+ fileprivate enum WorkTask {
96+ case creating( ID )
97+ case waiting( ID , WaitingContinuation )
98+ case running( ID )
99+ case cancelled( ID )
100+
101+ var id : ID {
102+ switch self {
103+ case . creating( let id) , . waiting( let id, _) , . running( let id) , . cancelled( let id) :
104+ return id
105+ }
106+ }
107+
108+ var continuation : WaitingContinuation ? {
109+ guard case . waiting( _, let continuation) = self else {
110+ return nil
111+ }
112+ return continuation
113+ }
114+ }
115+
116+ /// Creates an `AsyncOperationQueue` with a specified number of concurrent tasks.
117+ /// - Parameter concurrentTasks: The maximum number of concurrent tasks that can be executed concurrently.
118+ public init ( concurrentTasks: Int ) {
119+ self . concurrentTasks = concurrentTasks
120+ }
121+
122+ deinit {
123+ waitingTasksLock. withLock {
124+ if !waitingTasks. isEmpty {
125+ preconditionFailure ( " Deallocated with waiting tasks " )
126+ }
127+ }
128+ }
129+
130+ /// Executes an asynchronous operation, ensuring that the number of concurrent tasks
131+ // does not exceed the specified limit.
132+ /// - Parameter operation: The asynchronous operation to execute.
133+ /// - Returns: The result of the operation.
134+ /// - Throws: An error thrown by the operation, or a `CancellationError` if the operation is cancelled.
135+ public func withOperation< ReturnValue> (
136+ _ operation: ( ) async throws -> sending ReturnValue
137+ ) async throws -> ReturnValue {
138+ let taskId = try await waitIfNeeded ( )
139+ defer { signalCompletion ( taskId) }
140+ return try await operation ( )
141+ }
142+
143+ private func waitIfNeeded( ) async throws -> ID {
144+ let workTask = waitingTasksLock. withLock ( {
145+ let shouldWait = waitingTasks. count >= concurrentTasks
146+ let workTask = shouldWait ? WorkTask . creating ( ID ( ) ) : . running( ID ( ) )
147+ waitingTasks. append ( workTask)
148+ return workTask
149+ } )
150+
151+ // If we aren't creating a task that needs to wait, we're under the concurrency limit.
152+ guard case . creating( let taskId) = workTask else {
153+ return workTask. id
154+ }
155+
156+ enum TaskAction {
157+ case start( WaitingContinuation )
158+ case cancel( WaitingContinuation )
159+ }
160+
161+ try await withTaskCancellationHandler {
162+ try await withCheckedThrowingContinuation { ( continuation: WaitingContinuation ) -> Void in
163+ let action : TaskAction ? = waitingTasksLock. withLock {
164+ guard let index = waitingTasks. firstIndex ( where: { $0. id == taskId } ) else {
165+ // The task may have been marked as cancelled already and then removed from
166+ // waitingTasks in `signalCompletion`.
167+ return . cancel( continuation)
168+ }
169+
170+ switch waitingTasks [ index] {
171+ case . cancelled:
172+ // If the task was cancelled in between creating the task cancellation handler and acquiring the lock,
173+ // we should resume the continuation with a `CancellationError`.
174+ waitingTasks. remove ( at: index)
175+ return . cancel( continuation)
176+ case . creating, . running, . waiting:
177+ // A task may have completed since we initially checked if we should wait. Check again in this locked
178+ // section and if we can start it, remove it from the waiting tasks and start it immediately.
179+ if waitingTasks. count >= concurrentTasks {
180+ waitingTasks [ index] = . waiting( taskId, continuation)
181+ return nil
182+ } else {
183+ waitingTasks. remove ( at: index)
184+ return . start( continuation)
185+ }
186+ }
187+ }
188+
189+ switch action {
190+ case . some( . cancel( let continuation) ) :
191+ continuation. resume ( throwing: _Concurrency. CancellationError ( ) )
192+ case . some( . start( let continuation) ) :
193+ continuation. resume ( )
194+ case . none:
195+ return
196+ }
197+ }
198+ } onCancel: {
199+ let continuation : WaitingContinuation ? = self . waitingTasksLock. withLock {
200+ guard let taskIndex = self . waitingTasks. firstIndex ( where: { $0. id == taskId } ) else {
201+ return nil
202+ }
203+
204+ switch self . waitingTasks [ taskIndex] {
205+ case . waiting( _, let continuation) :
206+ self . waitingTasks. remove ( at: taskIndex)
207+
208+ // If the parent task is cancelled then we need to manually handle resuming the
209+ // continuation for the waiting task with a `CancellationError`. Return the continuation
210+ // here so it can be resumed once the `waitingTasksLock` is released.
211+ return continuation
212+ case . creating, . running:
213+ // If the task was still being created, mark it as cancelled in `waitingTasks` so that
214+ // the handler for `withCheckedThrowingContinuation` can immediately cancel it.
215+ self . waitingTasks [ taskIndex] = . cancelled( taskId)
216+ return nil
217+ case . cancelled:
218+ preconditionFailure ( " Attempting to cancel a task that was already cancelled " )
219+ }
220+ }
221+
222+ continuation? . resume ( throwing: _Concurrency. CancellationError ( ) )
223+ }
224+ return workTask. id
225+ }
226+
227+ private func signalCompletion( _ taskId: ID ) {
228+ let continuationToResume = waitingTasksLock. withLock { ( ) -> WaitingContinuation ? in
229+ guard !waitingTasks. isEmpty else {
230+ return nil
231+ }
232+
233+ // Remove the completed task from the list to decrement the active task count.
234+ if let taskIndex = self . waitingTasks. firstIndex ( where: { $0. id == taskId } ) {
235+ waitingTasks. remove ( at: taskIndex)
236+ }
237+
238+ // We cannot remove elements from `waitingTasks` while iterating over it, so we make
239+ // a pass to collect operations and then apply them after the loop.
240+ func createTaskListOperations( ) -> ( CollectionDifference < WorkTask > ? , WaitingContinuation ? ) {
241+ var changes : [ CollectionDifference < WorkTask > . Change ] = [ ]
242+ for (index, task) in waitingTasks. enumerated ( ) {
243+ switch task {
244+ case . running:
245+ // Skip tasks that are already running, looking for the first one that is waiting or creating.
246+ continue
247+ case . creating:
248+ // If the next task is in the process of being created, let the
249+ // creation code in the `withCheckedThrowingContinuation` in `waitIfNeeded`
250+ // handle starting the task.
251+ break
252+ case . waiting:
253+ // Begin the next waiting task
254+ changes. append ( . remove( offset: index, element: task, associatedWith: nil ) )
255+ return ( CollectionDifference < WorkTask > ( changes) , task. continuation)
256+ case . cancelled:
257+ // If the next task is cancelled, continue removing cancelled
258+ // tasks until we find one that hasn't run yet, or we exaust the list of waiting tasks.
259+ changes. append ( . remove( offset: index, element: task, associatedWith: nil ) )
260+ continue
261+ }
262+ }
263+ return ( CollectionDifference < WorkTask > ( changes) , nil )
264+ }
265+
266+ let ( collectionOperations, continuation) = createTaskListOperations ( )
267+ if let operations = collectionOperations {
268+ guard let appliedDiff = waitingTasks. applying ( operations) else {
269+ preconditionFailure ( " Failed to apply changes to waiting tasks " )
270+ }
271+ waitingTasks = appliedDiff
272+ }
273+
274+ return continuation
275+ }
276+
277+ continuationToResume? . resume ( )
278+ }
279+ }
0 commit comments