@@ -331,12 +331,13 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
331331 /// The queue of pending tasks that haven't been scheduled for execution yet.
332332 private var pendingTasks : [ QueuedTask < TaskDescription > ] = [ ]
333333
334- /// An ordered list of task priorities to the number of tasks that might execute concurrently at that ( or a higher)
335- /// priority.
334+ /// An ordered list of task priorities to the number of tasks that might execute concurrently at that or a lower
335+ /// priority. As the task priority is decreased, the number of current tasks becomes more restricted.
336336 ///
337- /// This list is sorted in descending priority order .
337+ /// This list is normalized according to `normalize(maxConcurrentTasksByPriority:)` .
338338 ///
339- /// The `maxConcurrentTasks` of the last element in this list is also used for tasks with a lower priority.
339+ /// The highest priority entry in this list restricts the total number of tasks that can be executed at any priority
340+ /// (including priorities higher than its entry).
340341 ///
341342 /// For example if you have
342343 /// ```swift
@@ -347,38 +348,40 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
347348 /// ```
348349 ///
349350 /// Then we allow the following number of concurrent tasks at the following priorities
350- /// - `.high`: 4
351+ /// - `.high`: 4 (because `.medium: 4` restricts the total number of tasks to 4)
351352 /// - `.medium`: 4
352353 /// - `.low`: 2
353354 /// - `.background`: 2
355+ ///
356+ /// When combining tasks with different priorities:
357+ /// - If we have 3 medium priority tasks, we can have at most 1 low priority task
358+ /// - If we have 1 medium priority task, we can still have 2 low priority tasks, but no more
354359 private var maxConcurrentTasksByPriority : [ ( priority: TaskPriority , maxConcurrentTasks: Int ) ] {
355360 didSet {
356- // These preconditions need to match the ones in `init`
357- maxConcurrentTasksByPriority = maxConcurrentTasksByPriority. sorted ( by: { $0. priority > $1. priority } )
358- precondition ( maxConcurrentTasksByPriority. allSatisfy { $0. maxConcurrentTasks >= 0 } )
359- precondition ( maxConcurrentTasksByPriority. map ( \. maxConcurrentTasks) . isSorted ( descending: true ) )
360- precondition ( !maxConcurrentTasksByPriority. isEmpty)
361-
362- // Check we are over-subscribed in currently executing tasks. If we are, cancel currently executing task to be
363- // rescheduled until we are within the new limit.
364- var tasksToReschedule : [ QueuedTask < TaskDescription > ] = [ ]
365- for (priority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
366- var tasksInPrioritySlot = currentlyExecutingTasks. filter { $0. priority <= priority }
367- if tasksInPrioritySlot. count <= maxConcurrentTasks {
368- // We have enough available slots. Nothing to do.
369- continue
370- }
371- tasksInPrioritySlot = tasksInPrioritySlot. sorted { $0. priority > $1. priority }
372- while tasksInPrioritySlot. count > maxConcurrentTasks {
373- // Cancel the task with the lowest priority (because it is least important and also takes a slot in the lower
374- // priority execution buckets) and the among those the most recent one because it has probably made the least
375- // progress.
376- guard let mostRecentTaskInSlot = tasksInPrioritySlot. popLast ( ) else {
377- // Should never happen because `tasksInPrioritySlot.count > maxConcurrentTasks >= 0`
378- logger. fault ( " Unexpectedly unable to pop last task from tasksInPrioritySlot " )
379- break
380- }
381- tasksToReschedule. append ( mostRecentTaskInSlot)
361+ maxConcurrentTasksByPriority = Self . normalize ( maxConcurrentTasksByPriority: maxConcurrentTasksByPriority)
362+
363+ if maxConcurrentTasksByPriority. count == oldValue. count,
364+ zip ( maxConcurrentTasksByPriority, oldValue) . allSatisfy ( == )
365+ {
366+ // We didn't actually change anything, so we don't need to perform any validation or task processing.
367+ return
368+ }
369+
370+ // Check we are over-subscribed in currently executing tasks by walking through all currently executing tasks and
371+ // checking if we could schedule them within the new execution limits. Cancel any tasks that do not fit within the
372+ // new limit to be rescheduled when we are within the limit again.
373+ var currentlyExecutingTaskDetails : [ ( priority: TaskPriority , estimatedCPUCoreCount: Int ) ] = [ ]
374+ var tasksToCancelAndReschedule : [ QueuedTask < TaskDescription > ] = [ ]
375+ for task in currentlyExecutingTasks. sorted ( by: { $0. priority > $1. priority } ) {
376+ let taskPriority = task. priority
377+ if Self . canScheduleTask (
378+ withPriority: taskPriority,
379+ maxConcurrentTasksByPriority: maxConcurrentTasksByPriority,
380+ currentlyExecutingTaskDetails: currentlyExecutingTaskDetails
381+ ) {
382+ currentlyExecutingTaskDetails. append ( ( taskPriority, task. description. estimatedCPUCoreCount) )
383+ } else {
384+ tasksToCancelAndReschedule. append ( task)
382385 }
383386 }
384387
@@ -390,7 +393,7 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
390393 // would cancel the tasks and then immediately reschedule it – while that's doing unnecessary work, it's still
391394 // correct.
392395 Task . detached ( priority: . high) {
393- for tasksToReschedule in tasksToReschedule {
396+ for tasksToReschedule in tasksToCancelAndReschedule {
394397 await tasksToReschedule. cancelToBeRescheduled ( )
395398 }
396399 }
@@ -406,11 +409,7 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
406409 }
407410
408411 package init ( maxConcurrentTasksByPriority: [ ( priority: TaskPriority , maxConcurrentTasks: Int ) ] ) {
409- // These preconditions need to match the ones in `maxConcurrentTasksByPriority:didSet`
410- self . maxConcurrentTasksByPriority = maxConcurrentTasksByPriority. sorted ( by: { $0. priority > $1. priority } )
411- precondition ( maxConcurrentTasksByPriority. allSatisfy { $0. maxConcurrentTasks >= 0 } )
412- precondition ( maxConcurrentTasksByPriority. map ( \. maxConcurrentTasks) . isSorted ( descending: true ) )
413- precondition ( !maxConcurrentTasksByPriority. isEmpty)
412+ self . maxConcurrentTasksByPriority = Self . normalize ( maxConcurrentTasksByPriority: maxConcurrentTasksByPriority)
414413 }
415414
416415 /// Enqueue a new task to be executed.
@@ -448,16 +447,58 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
448447 return queuedTask
449448 }
450449
451- /// Returns the maximum number of concurrent tasks that are allowed to execute at the given priority.
452- private func maxConcurrentTasks( at priority: TaskPriority ) -> Int {
453- for (atPriority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
454- if atPriority <= priority {
455- return maxConcurrentTasks
450+ private static func normalize(
451+ maxConcurrentTasksByPriority: [ ( priority: TaskPriority , maxConcurrentTasks: Int ) ]
452+ ) -> [ ( priority: TaskPriority , maxConcurrentTasks: Int ) ] {
453+ var maxConcurrentTasksByPriority = maxConcurrentTasksByPriority
454+
455+ // Ensure elements are sorted decreasingly by priority.
456+ maxConcurrentTasksByPriority = maxConcurrentTasksByPriority. sorted ( by: { $0. priority > $1. priority } )
457+
458+ // Ensure array is not empty.
459+ if maxConcurrentTasksByPriority. isEmpty {
460+ logger. fault ( " Received empty maxConcurrentTasksByPriority. Allowing as many tasks as there are processor cores. " )
461+ maxConcurrentTasksByPriority = [ ( . medium, ProcessInfo . processInfo. processorCount) ]
462+ }
463+
464+ // Ensure `maxConcurrentTasks` is not increasing with lower priority tasks.
465+ var lastMaxConcurrentTasks = maxConcurrentTasksByPriority. first!. maxConcurrentTasks
466+ for i in 1 ..< maxConcurrentTasksByPriority. count {
467+ if maxConcurrentTasksByPriority [ i] . maxConcurrentTasks > lastMaxConcurrentTasks {
468+ logger. fault ( " More tasks allowed for lower priority than for higher priority " )
469+ maxConcurrentTasksByPriority [ i] . maxConcurrentTasks = lastMaxConcurrentTasks
470+ } else {
471+ lastMaxConcurrentTasks = maxConcurrentTasksByPriority [ i] . maxConcurrentTasks
456472 }
457473 }
458- // `last!` is fine because the initializer of `maxConcurrentTasksByPriority` has a precondition that
459- // `maxConcurrentTasksByPriority` is not empty.
460- return maxConcurrentTasksByPriority. last!. maxConcurrentTasks
474+
475+ return maxConcurrentTasksByPriority
476+ }
477+
478+ /// Returns `true` if we can schedule a task with the given priority, assuming that the currently executing tasks have
479+ /// the given priorities.
480+ package static func canScheduleTask(
481+ withPriority newTaskPriority: TaskPriority ,
482+ maxConcurrentTasksByPriority: [ ( priority: TaskPriority , maxConcurrentTasks: Int ) ] ,
483+ currentlyExecutingTaskDetails: [ ( priority: TaskPriority , estimatedCPUCoreCount: Int ) ]
484+ ) -> Bool {
485+ if currentlyExecutingTaskDetails. sum ( of: \. estimatedCPUCoreCount)
486+ >= maxConcurrentTasksByPriority. first!. maxConcurrentTasks
487+ {
488+ return false
489+ }
490+ for (priority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
491+ guard priority >= newTaskPriority else {
492+ // This limit does not affect the new task
493+ continue
494+ }
495+ if currentlyExecutingTaskDetails. filter ( { $0. priority <= priority } ) . sum ( of: \. estimatedCPUCoreCount)
496+ >= maxConcurrentTasks
497+ {
498+ return false
499+ }
500+ }
501+ return true
461502 }
462503
463504 /// Poke the execution of more tasks in the queue.
@@ -466,12 +507,19 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
466507 private func poke( ) {
467508 pendingTasks. sort ( by: { $0. priority > $1. priority } )
468509 for task in pendingTasks {
469- if currentlyExecutingTasks. map ( \. description. estimatedCPUCoreCount) . sum ( ) >= maxConcurrentTasks ( at: task. priority)
470- {
510+ guard
511+ Self . canScheduleTask (
512+ withPriority: task. priority,
513+ maxConcurrentTasksByPriority: maxConcurrentTasksByPriority,
514+ currentlyExecutingTaskDetails: currentlyExecutingTasks. map ( {
515+ ( $0. priority, $0. description. estimatedCPUCoreCount)
516+ } )
517+ )
518+ else {
471519 // We don't have any execution slots left. Thus, this poker has nothing to do and is done.
472520 // When the next task finishes, it calls `poke` again.
473- // If the low priority task's priority gets elevated that task's priority will get elevated and it will be
474- // picked up on the next `poke` call .
521+ // If a low priority task's priority gets elevated that task's priority will get elevated, which will call
522+ // `poke`.
475523 return
476524 }
477525 let dependencies = task. description. dependencies ( to: currentlyExecutingTasks. map ( \. description) )
@@ -600,11 +648,11 @@ fileprivate extension Collection where Element: Comparable {
600648 }
601649}
602650
603- fileprivate extension Collection < Int > {
604- func sum( ) -> Int {
651+ fileprivate extension Collection {
652+ func sum( of transform : ( Self . Element ) -> Int ) -> Int {
605653 var result = 0
606654 for element in self {
607- result += element
655+ result += transform ( element)
608656 }
609657 return result
610658 }
0 commit comments