@@ -54,9 +54,14 @@ import java.util.concurrent.locks.*
5454 */
5555@Suppress(" NOTHING_TO_INLINE" )
5656internal class CoroutineScheduler (
57+ private val schedulerName : String ,
5758 private val corePoolSize : Int ,
5859 private val maxPoolSize : Int
5960) : Closeable {
61+ constructor (
62+ corePoolSize: Int ,
63+ maxPoolSize: Int
64+ ) : this (" CoroutineScheduler" , corePoolSize, maxPoolSize)
6065
6166 private val globalQueue: GlobalQueue = GlobalQueue ()
6267
@@ -161,16 +166,16 @@ internal class CoroutineScheduler(
161166 private val isTerminated = atomic(false )
162167
163168 companion object {
164- private const val MAX_SPINS = 1000L
165- private const val MAX_YIELDS = 500L
166-
167- @JvmStatic
168- private val MAX_PARK_TIME_NS = TimeUnit .SECONDS .toNanos(1 )
169+ private const val MAX_SPINS = 1000
170+ private const val MAX_YIELDS = MAX_SPINS + 500
171+
172+ @JvmStatic // Note, that is fits into Int (it is is equal to 10^9)
173+ private val MAX_PARK_TIME_NS = TimeUnit .SECONDS .toNanos(1 ).toInt()
169174
170175 @JvmStatic
171176 private val MIN_PARK_TIME_NS = (WORK_STEALING_TIME_RESOLUTION_NS / 4 )
172177 .coerceAtLeast(10 )
173- .coerceAtMost(MAX_PARK_TIME_NS )
178+ .coerceAtMost(MAX_PARK_TIME_NS .toLong()).toInt( )
174179
175180 // A symbol to mark workers that are not in parkedWorkersStack
176181 private val NOT_IN_STACK = Symbol (" NOT_IN_STACK" )
@@ -417,14 +422,10 @@ internal class CoroutineScheduler(
417422 var blockingWorkers = 0
418423 var cpuWorkers = 0
419424 var retired = 0
420- var finished = 0
421-
425+ var terminated = 0
422426 val queueSizes = arrayListOf<String >()
423427 for (worker in workers) {
424- if (worker == null ) {
425- continue
426- }
427-
428+ if (worker == null ) continue
428429 val queueSize = worker.localQueue.size()
429430 when (worker.state) {
430431 WorkerState .PARKING -> ++ parkedWorkers
@@ -440,29 +441,44 @@ internal class CoroutineScheduler(
440441 ++ retired
441442 if (queueSize > 0 ) queueSizes + = queueSize.toString() + " r" // Retiring
442443 }
443- WorkerState .FINISHED -> ++ finished
444+ WorkerState .TERMINATED -> ++ terminated
444445 }
445446 }
446-
447- return " ${super .toString()} [core pool size = $corePoolSize , " +
448- " CPU workers = $cpuWorkers , " +
449- " blocking workers = $blockingWorkers , " +
450- " parked workers = $parkedWorkers , " +
451- " retired workers = $retired , " +
452- " finished workers = $finished , " +
447+ val state = controlState.value
448+ return " $schedulerName @$hexAddress [" +
449+ " Pool Size {" +
450+ " core = $corePoolSize , " +
451+ " max = $maxPoolSize }, " +
452+ " Worker States {" +
453+ " CPU = $cpuWorkers , " +
454+ " blocking = $blockingWorkers , " +
455+ " parked = $parkedWorkers , " +
456+ " retired = $retired , " +
457+ " terminated = $terminated }, " +
453458 " running workers queues = $queueSizes , " +
454- " global queue size = ${globalQueue.size} ], " +
455- " control state: ${controlState.value} "
459+ " global queue size = ${globalQueue.size} , " +
460+ " Control State Workers {" +
461+ " created = ${createdWorkers(state)} , " +
462+ " blocking = ${blockingWorkers(state)} }" +
463+ " ]"
456464 }
457465
458- // todo: make name of the pool configurable (optional parameter to CoroutineScheduler) and base thread names on it
459- internal inner class Worker (sequenceNumber : Int ) : Thread(" CoroutineScheduler-worker-$sequenceNumber " ) {
466+ internal inner class Worker private constructor() : Thread() {
460467 init {
461468 isDaemon = true
462469 }
463470
464471 // guarded by scheduler lock
465- private var indexInArray = sequenceNumber
472+ private var indexInArray = - 1
473+ set(index) {
474+ name = " $schedulerName -worker-${if (index < 0 ) " TERMINATED" else index.toString()} "
475+ field = index
476+ }
477+
478+ constructor (index: Int ) : this () {
479+ indexInArray = index
480+ }
481+
466482 val localQueue: WorkQueue = WorkQueue ()
467483
468484 /* *
@@ -552,15 +568,16 @@ internal class CoroutineScheduler(
552568 private var lastExhaustionTime = 0L
553569
554570 @Volatile // Required for concurrent idleResetBeforeUnpark
555- private var spins = 0L
556- private var yields = 0L // TODO replace with IntPair when inline classes arrive
571+ private var spins = 0 // spins until MAX_SPINS, then yields until MAX_YIELDS
557572
573+ // Note: it is concurrently reset by idleResetBeforeUnpark
558574 private var parkTimeNs = MIN_PARK_TIME_NS
575+
559576 private var rngState = random.nextInt()
560577
561578 override fun run () {
562579 var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
563- while (! isTerminated.value && state != WorkerState .FINISHED ) {
580+ while (! isTerminated.value && state != WorkerState .TERMINATED ) {
564581 val task = findTask()
565582 if (task == null ) {
566583 // Wait for a job with potential park
@@ -581,7 +598,7 @@ internal class CoroutineScheduler(
581598 }
582599 }
583600
584- tryReleaseCpu(WorkerState .FINISHED )
601+ tryReleaseCpu(WorkerState .TERMINATED )
585602 }
586603
587604 private fun runSafely (block : Runnable ) {
@@ -655,20 +672,16 @@ internal class CoroutineScheduler(
655672 * The main idea is not to park while it's possible (otherwise throughput on asymmetric workloads suffers due to too frequent
656673 * park/unpark calls and delays between job submission and thread queue checking)
657674 */
658- when {
659- // Volatile read spins, shall be read first
660- spins < MAX_SPINS -> ++ spins
661- yields <= MAX_YIELDS -> {
662- ++ yields
663- yield ()
664- }
665- else -> {
666- if (parkTimeNs < MAX_PARK_TIME_NS ) {
667- parkTimeNs = (parkTimeNs * 3 shr 1 ).coerceAtMost(MAX_PARK_TIME_NS )
668- }
669- tryReleaseCpu(WorkerState .PARKING )
670- doPark(parkTimeNs)
675+ val spins = this .spins // volatile read
676+ if (spins <= MAX_YIELDS ) {
677+ this .spins = spins + 1 // volatile write
678+ if (spins >= MAX_SPINS ) yield ()
679+ } else {
680+ if (parkTimeNs < MAX_PARK_TIME_NS ) {
681+ parkTimeNs = (parkTimeNs * 3 ushr 1 ).coerceAtMost(MAX_PARK_TIME_NS )
671682 }
683+ tryReleaseCpu(WorkerState .PARKING )
684+ doPark(parkTimeNs.toLong())
672685 }
673686 }
674687
@@ -712,12 +725,14 @@ internal class CoroutineScheduler(
712725 * Now move last worker into an index in array that was previously occupied by this worker.
713726 */
714727 val lastWorkerIndex = decrementCreatedWorkers()
715- val worker = workers[lastWorkerIndex]!!
716- workers[indexInArray] = worker
717- worker .indexInArray = indexInArray
728+ val lastWorker = workers[lastWorkerIndex]!!
729+ workers[indexInArray] = lastWorker
730+ lastWorker .indexInArray = indexInArray
718731 workers[lastWorkerIndex] = null
732+ // Cleanup index of this worker for debugging purposes
733+ indexInArray = - 1
719734 }
720- state = WorkerState .FINISHED
735+ state = WorkerState .TERMINATED
721736 }
722737
723738 /* *
@@ -741,14 +756,12 @@ internal class CoroutineScheduler(
741756 state = WorkerState .BLOCKING
742757 parkTimeNs = MIN_PARK_TIME_NS
743758 }
744- yields = 0
745759 spins = 0
746760 }
747761
748762 // It is invoked by other thread before this worker is unparked
749763 fun idleResetBeforeUnpark () {
750764 parkTimeNs = MIN_PARK_TIME_NS
751- yields = 0
752765 spins = 0 // Volatile write, should be written last
753766 }
754767
@@ -816,6 +829,6 @@ internal class CoroutineScheduler(
816829 /* *
817830 * Terminal state, will no longer be used
818831 */
819- FINISHED
832+ TERMINATED
820833 }
821834}
0 commit comments