@@ -97,18 +97,77 @@ extension ExecutorJob {
9797 }
9898}
9999
100+ #if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
101+ /// A wait queue is a specialised priority queue used to run a timer.
102+ @available ( StdlibDeploymentTarget 6 . 2 , * )
103+ struct WaitQueue {
104+ var queue : PriorityQueue < UnownedJob >
105+ var clock : _ClockID
106+
107+ init ( clock: _ClockID ) {
108+ queue = PriorityQueue ( compare: {
109+ ExecutorJob ( $0) . cooperativeExecutorTimestamp
110+ < ExecutorJob( $1) . cooperativeExecutorTimestamp
111+ } )
112+ self . clock = clock
113+ }
114+
115+ var currentTime : CooperativeExecutor . Timestamp {
116+ var now : CooperativeExecutor . Timestamp = . zero
117+ unsafe _getTime ( seconds: & now. seconds,
118+ nanoseconds: & now. nanoseconds,
119+ clock: clock. rawValue)
120+ return now
121+ }
122+
123+ mutating func enqueue( _ job: consuming ExecutorJob ,
124+ after delay: CooperativeExecutor . Duration ) {
125+ let deadline = currentTime + delay
126+ job. setupCooperativeExecutorTimestamp ( )
127+ job. cooperativeExecutorTimestamp = deadline
128+ queue. push ( UnownedJob ( job) )
129+ }
130+
131+ mutating func forEachReadyJob( body: ( consuming ExecutorJob ) -> ( ) ) {
132+ let now = currentTime
133+ while let job = queue. pop (
134+ when: {
135+ ExecutorJob ( $0) . cooperativeExecutorTimestamp < now
136+ } ) {
137+ var theJob = ExecutorJob ( job)
138+ theJob. clearCooperativeExecutorTimestamp ( )
139+ body ( theJob)
140+ }
141+ }
142+
143+ var timeToNextJob : CooperativeExecutor . Duration ? {
144+ if let job = queue. top {
145+ let deadline = ExecutorJob ( job) . cooperativeExecutorTimestamp
146+ let now = currentTime
147+ if deadline > now {
148+ return deadline - now
149+ } else {
150+ return CooperativeExecutor . Duration ( seconds: 0 , nanoseconds: 0 )
151+ }
152+ }
153+ return nil
154+ }
155+ }
156+ #endif
157+
100158/// A co-operative executor that can be used as the main executor or as a
101159/// task executor.
102160@available ( StdlibDeploymentTarget 6 . 2 , * )
103161final class CooperativeExecutor : Executor , @unchecked Sendable {
104162 var runQueue : PriorityQueue < UnownedJob >
105163 #if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
106- var waitQueue : PriorityQueue < UnownedJob >
164+ var suspendingWaitQueue = WaitQueue ( clock: . suspending)
165+ var continuousWaitQueue = WaitQueue ( clock: . continuous)
107166 #endif
108167 var shouldStop : Bool = false
109168
110169 /// Internal representation of a duration for CooperativeExecutor
111- struct Duration {
170+ struct Duration : Comparable {
112171 var seconds : Int64
113172 var nanoseconds : Int64
114173
@@ -122,6 +181,16 @@ final class CooperativeExecutor: Executor, @unchecked Sendable {
122181 self . seconds = seconds
123182 self . nanoseconds = attoseconds / 1_000_000_000
124183 }
184+
185+ static func == ( lhs: Duration , rhs: Duration ) -> Bool {
186+ return lhs. seconds == rhs. seconds && lhs. nanoseconds == rhs. nanoseconds
187+ }
188+ static func < ( lhs: Duration , rhs: Duration ) -> Bool {
189+ return lhs. seconds < rhs. seconds || (
190+ lhs. seconds == rhs. seconds
191+ && lhs. nanoseconds < rhs. nanoseconds
192+ )
193+ }
125194 }
126195
127196 /// Internal representation of a timestamp for CooperativeExecutor
@@ -165,14 +234,6 @@ final class CooperativeExecutor: Executor, @unchecked Sendable {
165234
166235 public init ( ) {
167236 runQueue = PriorityQueue ( compare: { $0. priority > $1. priority } )
168-
169- #if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
170- waitQueue =
171- PriorityQueue ( compare: {
172- ExecutorJob ( $0) . cooperativeExecutorTimestamp
173- < ExecutorJob( $1) . cooperativeExecutorTimestamp
174- } )
175- #endif
176237 }
177238
178239 public func enqueue( _ job: consuming ExecutorJob ) {
@@ -190,11 +251,11 @@ extension CooperativeExecutor: SchedulingExecutor {
190251 return self
191252 }
192253
193- var currentTime : Timestamp {
254+ func currentTime( clock : _ClockID ) -> Timestamp {
194255 var now : Timestamp = . zero
195256 unsafe _getTime( seconds: & now. seconds,
196257 nanoseconds: & now. nanoseconds,
197- clock: _ClockID . suspending . rawValue)
258+ clock: clock . rawValue)
198259 return now
199260 }
200261
@@ -203,24 +264,19 @@ extension CooperativeExecutor: SchedulingExecutor {
203264 tolerance: C . Duration ? = nil ,
204265 clock: C ) {
205266 // If it's a clock we know, get the duration to wait
206- let duration : Duration
207267 if let _ = clock as? ContinuousClock {
208- // We would need to add a second wait queue to support this
209- fatalError ( " CooperativeExecutor currently only supports suspending waits " )
268+ let continuousDuration = delay as! ContinuousClock . Duration
269+ let duration = Duration ( from: continuousDuration)
270+ continuousWaitQueue. enqueue ( job, after: duration)
210271 } else if let _ = clock as? SuspendingClock {
211272 let suspendingDuration = delay as! SuspendingClock . Duration
212- duration = Duration ( from: suspendingDuration)
273+ let duration = Duration ( from: suspendingDuration)
274+ suspendingWaitQueue. enqueue ( job, after: duration)
213275 } else {
214276 clock. enqueue ( job, on: self , at: clock. now. advanced ( by: delay) ,
215277 tolerance: tolerance)
216278 return
217279 }
218-
219- let deadline = self . currentTime + duration
220-
221- job. setupCooperativeExecutorTimestamp ( )
222- job. cooperativeExecutorTimestamp = deadline
223- waitQueue. push ( UnownedJob ( job) )
224280 }
225281
226282}
@@ -236,14 +292,12 @@ extension CooperativeExecutor: RunLoopExecutor {
236292 shouldStop = false
237293 while !shouldStop && !condition( ) {
238294 #if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
239- // Process the timer queue
240- let now = currentTime
241- while let job = waitQueue. pop ( when: {
242- ExecutorJob ( $0) . cooperativeExecutorTimestamp <= now
243- } ) {
244- var theJob = ExecutorJob ( job)
245- theJob. clearCooperativeExecutorTimestamp ( )
246- runQueue. push ( job)
295+ // Process the timer queues
296+ suspendingWaitQueue. forEachReadyJob {
297+ runQueue. push ( UnownedJob ( $0) )
298+ }
299+ continuousWaitQueue. forEachReadyJob {
300+ runQueue. push ( UnownedJob ( $0) )
247301 }
248302 #endif
249303
@@ -257,14 +311,17 @@ extension CooperativeExecutor: RunLoopExecutor {
257311
258312 #if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
259313 // Finally, wait until the next deadline
260- if let job = waitQueue. top {
261- let deadline = ExecutorJob ( job) . cooperativeExecutorTimestamp
262- let now = self . currentTime
263- if deadline > now {
264- let toWait = deadline - now
265- _sleep ( seconds: toWait. seconds,
266- nanoseconds: toWait. nanoseconds)
314+ var toWait: Duration? = suspendingWaitQueue. timeToNextJob
315+
316+ if let continuousToWait = continuousWaitQueue . timeToNextJob {
317+ if toWait == nil || continuousToWait < toWait! {
318+ toWait = continuousToWait
267319 }
320+ }
321+
322+ if let toWait {
323+ _sleep ( seconds: toWait. seconds,
324+ nanoseconds: toWait. nanoseconds)
268325 } else if runQueue . isEmpty {
269326 // Stop if no more jobs are available
270327 break
0 commit comments