Skip to content

Commit 1c5ae48

Browse files
authored
Fix oversubscription in Dispatchers.Default (#3684)
Previously, an arbitrary task was added to the scheduler's queue and the corresponding counter was incremented. The actual executing thread decremented the counter as soon as the task was done, meaning that, if timings are unlucky enough, the overall balance of blocking tasks could've been negative. The second part of the equation is that all increments are atomic (instead of being CAS-based), meaning that due to programmatic bugs like the previous one, it's easy to overstep the boundary of adjacent masks, leaving the state in an inconsistent state. Fix is trivial -- increment counter before publishing, do not reset it if the scheduler is closed and task wasn't added. Fixes #3642
1 parent 33b2a9a commit 1c5ae48

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,13 @@ internal class CoroutineScheduler(
265265

266266
/**
267267
* The `Long` value describing the state of workers in this pool.
268-
* Currently includes created, CPU-acquired, and blocking workers, each occupying [BLOCKING_SHIFT] bits.
268+
* Currently, includes created, CPU-acquired, and blocking workers, each occupying [BLOCKING_SHIFT] bits.
269+
*
270+
* State layout (highest to lowest bits):
271+
* | --- number of cpu permits, 22 bits --- | --- number of blocking tasks, 21 bits --- | --- number of created threads, 21 bits --- |
269272
*/
270273
private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
274+
271275
private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
272276
private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
273277

@@ -383,6 +387,10 @@ internal class CoroutineScheduler(
383387
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
384388
trackTask() // this is needed for virtual time support
385389
val task = createTask(block, taskContext)
390+
val isBlockingTask = task.isBlocking
391+
// Invariant: we increment counter **before** publishing the task
392+
// so executing thread can safely decrement the number of blocking tasks
393+
val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
386394
// try to submit the task to the local queue and act depending on the result
387395
val currentWorker = currentWorker()
388396
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
@@ -394,12 +402,12 @@ internal class CoroutineScheduler(
394402
}
395403
val skipUnpark = tailDispatch && currentWorker != null
396404
// Checking 'task' instead of 'notAdded' is completely okay
397-
if (task.mode == TASK_NON_BLOCKING) {
405+
if (isBlockingTask) {
406+
// Use state snapshot to better estimate the number of running threads
407+
signalBlockingWork(stateSnapshot, skipUnpark = skipUnpark)
408+
} else {
398409
if (skipUnpark) return
399410
signalCpuWork()
400-
} else {
401-
// Increment blocking tasks anyway
402-
signalBlockingWork(skipUnpark = skipUnpark)
403411
}
404412
}
405413

@@ -413,11 +421,11 @@ internal class CoroutineScheduler(
413421
return TaskImpl(block, nanoTime, taskContext)
414422
}
415423

416-
private fun signalBlockingWork(skipUnpark: Boolean) {
417-
// Use state snapshot to avoid thread overprovision
418-
val stateSnapshot = incrementBlockingTasks()
424+
// NB: should only be called from 'dispatch' method due to blocking tasks increment
425+
private fun signalBlockingWork(stateSnapshot: Long, skipUnpark: Boolean) {
419426
if (skipUnpark) return
420427
if (tryUnpark()) return
428+
// Use state snapshot to avoid accidental thread overprovision
421429
if (tryCreateWorker(stateSnapshot)) return
422430
tryUnpark() // Try unpark again in case there was race between permit release and parking
423431
}

kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ class CoroutineSchedulerInternalApiStressTest : TestBase() {
8080
}
8181
}
8282
completionLatch.countDown()
83-
// assertEquals(100, timesHelped)
84-
// assertTrue(Thread.currentThread() in observedDefaultThreads, observedDefaultThreads.toString())
83+
assertEquals(100, timesHelped)
84+
assertTrue(Thread.currentThread() in observedDefaultThreads, observedDefaultThreads.toString())
8585
}
8686
}
8787
}

0 commit comments

Comments
 (0)