@@ -8,6 +8,7 @@ import kotlinx.atomicfu.*
88import kotlinx.coroutines.*
99import kotlinx.coroutines.internal.*
1010import java.io.*
11+ import java.lang.AssertionError
1112import java.util.concurrent.*
1213import java.util.concurrent.atomic.*
1314import java.util.concurrent.locks.*
@@ -272,12 +273,6 @@ internal class CoroutineScheduler(
272273 @JvmField
273274 val NOT_IN_STACK = Symbol (" NOT_IN_STACK" )
274275
275- // Local queue 'add' results
276- private const val ADDED = - 1
277- // Added to the local queue, but pool requires additional worker to keep up
278- private const val ADDED_REQUIRES_HELP = 0
279- private const val NOT_ADDED = 1
280-
281276 // Worker termination states
282277 private const val FORBIDDEN = - 1
283278 private const val ALLOWED = 0
@@ -351,18 +346,13 @@ internal class CoroutineScheduler(
351346 trackTask() // this is needed for virtual time support
352347 val task = createTask(block, taskContext)
353348 // try to submit the task to the local queue and act depending on the result
354- when (submitToLocalQueue(task, fair)) {
355- ADDED -> return
356- NOT_ADDED -> {
357- // try to offload task to global queue
358- if (! globalQueue.addLast(task)) {
359- // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
360- throw RejectedExecutionException (" $schedulerName was terminated" )
361- }
362- requestCpuWorker()
349+ if (! submitToLocalQueue(task, fair)) {
350+ if (! globalQueue.addLast(task)) {
351+ // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
352+ throw RejectedExecutionException (" $schedulerName was terminated" )
363353 }
364- else -> requestCpuWorker() // ask for help
365354 }
355+ requestCpuWorker()
366356 }
367357
368358 internal fun createTask (block : Runnable , taskContext : TaskContext ): Task {
@@ -420,40 +410,12 @@ internal class CoroutineScheduler(
420410 private fun tryUnpark (): Boolean {
421411 while (true ) {
422412 val worker = parkedWorkersStackPop() ? : return false
423- /*
424- * If we successfully took the worker out of the queue, it could be in the following states:
425- * 1) Worker is parked. Just wake up it and reset its termination deadline to avoid
426- * "termination during tryUnpark" race.
427- * 2) Worker is not parked and is rescanning the queue before actual parking.
428- * Worker state may be CPU_ACQUIRED or BLOCKING (has no permit, wants to terminate).
429- * 3) Worker is executing some task. We can't really distinguish it from the previous case, so just proceed.
430- * 4) Worker is terminated, proceed and try to find another one.
431- *
432- *
433- * Check that the thread we've found in the queue was indeed in parking state, before we
434- * actually try to unpark it.
435- */
436- val wasParking = worker.isParking
437- /*
438- * Send unpark signal anyway, because the thread may have made decision to park but have not yet set its
439- * state to parking and this could be the last thread we have (unparking random thread would not harm).
440- */
441- LockSupport .unpark(worker)
442- /*
443- * If worker was parking, then we can be sure that our signal is not lost.
444- * Otherwise it could be a thread in state "3", so let's try ti find another thread.
445- */
446- if (! wasParking) continue
447- /*
448- * Terminating worker could be selected.
449- * If it's already TERMINATED or we cannot forbid it from terminating, then try find another worker.
450- */
451- if (! worker.tryForbidTermination()) continue
452- /*
453- * Here we've successfully unparked a thread that was parked and had forbidden it from making
454- * decision to terminate, so we are now sure we've got some help.
455- */
456- return true
413+ val time = worker.minDelayUntilStealableTask // TODO explain
414+ worker.parkingAllowed = false
415+ if (worker.signallingAllowed && time == 0L ) {
416+ LockSupport .unpark(worker)
417+ }
418+ if (time == 0L && worker.tryForbidTermination()) return true
457419 }
458420 }
459421
@@ -489,57 +451,24 @@ internal class CoroutineScheduler(
489451 }
490452
491453 /* *
492- * Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
454+ * Returns `true` if added, `false` otherwise
493455 */
494- private fun submitToLocalQueue (task : Task , fair : Boolean ): Int {
495- val worker = currentWorker() ? : return NOT_ADDED
456+ private fun submitToLocalQueue (task : Task , fair : Boolean ): Boolean {
457+ val worker = currentWorker() ? : return false
496458
497459 /*
498460 * This worker could have been already terminated from this thread by close/shutdown and it should not
499461 * accept any more tasks into its local queue.
500462 */
501- if (worker.state == = WorkerState .TERMINATED ) return NOT_ADDED
502-
503- var result = ADDED
504- if (task.mode == TaskMode .NON_BLOCKING ) {
505- /*
506- * If the worker is currently executing blocking task and tries to dispatch non-blocking task, it's one the following reasons:
507- * 1) Blocking worker is finishing its block and resumes non-blocking continuation
508- * 2) Blocking worker starts to create non-blocking jobs
509- *
510- * First use-case is expected (as recommended way of using blocking contexts),
511- * so we add non-blocking task to local queue, but also request CPU worker to mitigate second case
512- */
513- if (worker.isBlocking) {
514- result = ADDED_REQUIRES_HELP
515- } else {
516- /*
517- * If thread is not blocking, then it's just tries to finish its
518- * local work in order to park (or grab another blocking task), do not add non-blocking tasks
519- * to its local queue if it can't acquire CPU
520- */
521- val hasPermit = worker.tryAcquireCpuPermit()
522- if (! hasPermit) {
523- return NOT_ADDED
524- }
525- }
526- }
527-
528- val noOffloadingHappened = if (fair) {
529- worker.localQueue.addLast(task, globalQueue)
530- } else {
531- worker.localQueue.add(task, globalQueue)
532- }
533-
534- if (noOffloadingHappened) {
535- // When we're close to queue capacity, wake up anyone to steal work
536- // Note: non-atomic bufferSize here is Ok (it is just a performance optimization)
537- if (worker.localQueue.bufferSize > QUEUE_SIZE_OFFLOAD_THRESHOLD ) {
538- return ADDED_REQUIRES_HELP
539- }
540- return result
541- }
542- return ADDED_REQUIRES_HELP
463+ if (worker.state == = WorkerState .TERMINATED ) return false
464+ if (task.mode == TaskMode .NON_BLOCKING && worker.isBlocking) {
465+ return false
466+ }
467+ val notAdded = with (worker.localQueue) {
468+ if (fair) addLast(task) else add(task)
469+ } ? : return true // Forgive me, Father, for this formatting
470+ globalQueue.addLast(notAdded)
471+ return true
543472 }
544473
545474 private fun currentWorker (): Worker ? = (Thread .currentThread() as ? Worker )?.takeIf { it.scheduler == this }
@@ -563,7 +492,7 @@ internal class CoroutineScheduler(
563492 val queueSizes = arrayListOf<String >()
564493 for (index in 1 until workers.length()) {
565494 val worker = workers[index] ? : continue
566- val queueSize = worker.localQueue.size()
495+ val queueSize = worker.localQueue.size
567496 when (worker.state) {
568497 WorkerState .PARKING -> ++ parkedWorkers
569498 WorkerState .BLOCKING -> {
@@ -668,6 +597,10 @@ internal class CoroutineScheduler(
668597 */
669598 @Volatile
670599 var nextParkedWorker: Any? = NOT_IN_STACK
600+ @Volatile // TODO ughm don't ask
601+ var parkingAllowed = false
602+ @Volatile
603+ var signallingAllowed = false
671604
672605 /* *
673606 * Tries to set [terminationState] to [FORBIDDEN], returns `false` if this attempt fails.
@@ -719,8 +652,12 @@ internal class CoroutineScheduler(
719652 private var lastExhaustionTime = 0L
720653
721654 private var rngState = Random .nextInt()
722- // The delay until at least one task in other worker queues will become stealable
723- private var minDelayUntilStealableTask = 0L
655+ /*
656+ * The delay until at least one task in other worker queues will become stealable.
657+ * Volatile to avoid benign data-race
658+ */
659+ @Volatile
660+ public var minDelayUntilStealableTask = 0L
724661
725662 override fun run () = runWorker()
726663
@@ -744,15 +681,21 @@ internal class CoroutineScheduler(
744681 *
745682 * Park duration depends on the possible state: either this is the idleWorkerKeepAliveNs or stealing deadline.
746683 */
684+ parkingAllowed = true
747685 if (parkedWorkersStackPush(this )) {
748686 continue
749687 } else {
750- tryReleaseCpu(WorkerState .PARKING )
751- if (minDelayUntilStealableTask > 0 ) {
752- LockSupport .parkNanos(minDelayUntilStealableTask) // No spurious wakeup check here
753- } else {
754- park()
688+ signallingAllowed = true
689+ if (parkingAllowed) {
690+ tryReleaseCpu(WorkerState .PARKING )
691+ if (minDelayUntilStealableTask > 0 ) {
692+ LockSupport .parkNanos(minDelayUntilStealableTask) // No spurious wakeup check here
693+ } else {
694+ assert { localQueue.size == 0 }
695+ park()
696+ }
755697 }
698+ signallingAllowed = false
756699 }
757700 }
758701 tryReleaseCpu(WorkerState .TERMINATED )
@@ -800,7 +743,7 @@ internal class CoroutineScheduler(
800743 val currentState = state
801744 // Shutdown sequence of blocking dispatcher
802745 if (currentState != = WorkerState .TERMINATED ) {
803- assert { currentState == WorkerState .BLOCKING } // "Expected BLOCKING state, but has $currentState"
746+ assert { ( currentState == WorkerState .BLOCKING ). also { if ( ! it) throw AssertionError ( " AAA: $currentState " ) } } // "Expected BLOCKING state, but has $currentState"
804747 state = WorkerState .RETIRING
805748 }
806749 }
@@ -910,10 +853,12 @@ internal class CoroutineScheduler(
910853 * Checks whether new blocking tasks arrived to the pool when worker decided
911854 * it can go to deep park/termination and puts recently arrived task to its local queue.
912855 * Returns `true` if there is no blocking tasks in the queue.
856+ * Invariant: invoked only with empty local queue
913857 */
914858 private fun blockingQuiescence (): Boolean {
859+ assert { localQueue.size == 0 }
915860 globalQueue.removeFirstWithModeOrNull(TaskMode .PROBABLY_BLOCKING )?.let {
916- localQueue.add(it, globalQueue )
861+ localQueue.add(it)
917862 return false
918863 }
919864 return true
@@ -960,6 +905,7 @@ internal class CoroutineScheduler(
960905 }
961906
962907 private fun trySteal (): Task ? {
908+ assert { localQueue.size == 0 }
963909 val created = createdWorkers
964910 // 0 to await an initialization and 1 to avoid excess stealing on single-core machines
965911 if (created < 2 ) {
@@ -973,7 +919,8 @@ internal class CoroutineScheduler(
973919 if (currentIndex > created) currentIndex = 1
974920 val worker = workers[currentIndex]
975921 if (worker != = null && worker != = this ) {
976- val stealResult = localQueue.trySteal(worker.localQueue, globalQueue)
922+ assert { localQueue.size == 0 }
923+ val stealResult = localQueue.tryStealFrom(victim = worker.localQueue)
977924 if (stealResult == TASK_STOLEN ) {
978925 return localQueue.poll()
979926 } else if (stealResult > 0 ) {
0 commit comments