Skip to content

Commit 1d3b019

Browse files
elizarovqwwdfsad
authored andcommitted
Improved management of parkedWorkersStack in terms of ABA/concurrency;
Also includes documentation, naming and packaging fixes
1 parent 7067109 commit 1d3b019

File tree

12 files changed

+420
-336
lines changed

12 files changed

+420
-336
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.experimental.internal
6+
7+
import kotlinx.atomicfu.*
8+
9+
internal open class LockFreeMPMCQueueNode<T> {
10+
val next = atomic<T?>(null)
11+
12+
@PublishedApi internal val nextValue: T? get() = next.value
13+
}
14+
15+
/*
16+
* Michael & Scott lock-free Multi-Producer Multi-Consumer Queue with support for poll with predicate.
17+
*
18+
* @suppress **This is unstable API and it is subject to change.**
19+
*/
20+
internal open class LockFreeMPMCQueue<T : LockFreeMPMCQueueNode<T>> {
21+
private val head =
22+
@Suppress("UNCHECKED_CAST")
23+
atomic(LockFreeMPMCQueueNode<T>() as T) // sentinel
24+
25+
private val tail = atomic(head.value)
26+
27+
public fun addLast(node: T): Boolean {
28+
tail.loop { curTail ->
29+
val curNext = curTail.next.value
30+
if (curNext != null) {
31+
tail.compareAndSet(curTail, curNext)
32+
return@loop // retry
33+
}
34+
if (curTail.next.compareAndSet(null, node)) {
35+
tail.compareAndSet(curTail, node)
36+
return true
37+
}
38+
}
39+
}
40+
41+
public fun removeFistOrNull(): T? {
42+
head.loop { curHead ->
43+
val next = curHead.next.value ?: return null
44+
if (head.compareAndSet(curHead, next)) {
45+
return next
46+
}
47+
}
48+
}
49+
50+
@PublishedApi internal val headValue: T get() = head.value
51+
@PublishedApi internal fun headCas(curHead: T, update: T) = head.compareAndSet(curHead, update)
52+
53+
public inline fun removeFistOrNullIf(predicate: (T) -> Boolean): T? {
54+
while (true) {
55+
val curHead = headValue
56+
val next = curHead.nextValue ?: return null
57+
if (!predicate(next)) return null
58+
if (headCas(curHead, next)) {
59+
return next
60+
}
61+
}
62+
}
63+
64+
public fun isEmpty(): Boolean = size == 0
65+
66+
public val size: Int get() = fold(0) { acc, _ -> acc + 1 }
67+
68+
public inline fun <R> fold(initial: R, operation: (acc: R, T) -> R): R {
69+
var acc = initial
70+
var cur = headValue
71+
while (true) {
72+
val next = cur.nextValue ?: break
73+
acc = operation(acc, next)
74+
cur = next
75+
}
76+
return acc
77+
}
78+
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt

Lines changed: 248 additions & 217 deletions
Large diffs are not rendered by default.

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,14 @@ import java.io.*
66
import java.util.concurrent.*
77
import kotlin.coroutines.experimental.*
88

9+
/**
10+
* @suppress **This is unstable API and it is subject to change.**
11+
*/
912
// TODO make internal after integration wih Ktor
10-
class ExperimentalCoroutineDispatcher(corePoolSize: Int = CORE_POOL_SIZE, maxPoolSize: Int = MAX_POOL_SIZE) : CoroutineDispatcher(), Delay, Closeable {
13+
class ExperimentalCoroutineDispatcher(
14+
corePoolSize: Int = CORE_POOL_SIZE,
15+
maxPoolSize: Int = MAX_POOL_SIZE
16+
) : CoroutineDispatcher(), Delay, Closeable {
1117

1218
private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize)
1319

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/LockFreeQueue.kt

Lines changed: 0 additions & 72 deletions
This file was deleted.

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package kotlinx.coroutines.experimental.scheduling
22

3+
import kotlinx.coroutines.experimental.internal.*
34
import java.util.concurrent.*
45

56

6-
internal typealias Task = TimedTask
7-
internal typealias GlobalQueue = TaskQueue
87
// TODO most of these fields will be moved to 'object ExperimentalDispatcher'
98

109
// 100us as default
@@ -21,12 +20,12 @@ internal val BLOCKING_DEFAULT_PARALLELISM = readFromSystemProperties(
2120
"kotlinx.coroutines.scheduler.blocking.parallelism", 16)
2221

2322
@JvmField
24-
internal val CORE_POOL_SIZE = readFromSystemProperties(
25-
"kotlinx.coroutines.scheduler.core.pool.size", Runtime.getRuntime().availableProcessors().coerceAtLeast(2))
23+
internal val CORE_POOL_SIZE = readFromSystemProperties("kotlinx.coroutines.scheduler.core.pool.size",
24+
Runtime.getRuntime().availableProcessors().coerceAtLeast(2))
2625

2726
@JvmField
28-
internal val MAX_POOL_SIZE = readFromSystemProperties(
29-
"kotlinx.coroutines.scheduler.max.pool.size", Runtime.getRuntime().availableProcessors() * 128)
27+
internal val MAX_POOL_SIZE = readFromSystemProperties("kotlinx.coroutines.scheduler.max.pool.size",
28+
(Runtime.getRuntime().availableProcessors() * 128).coerceIn(CORE_POOL_SIZE, CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE))
3029

3130
@JvmField
3231
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(readFromSystemProperties(
@@ -42,7 +41,18 @@ internal enum class TaskMode {
4241
PROBABLY_BLOCKING,
4342
}
4443

45-
internal data class TimedTask(val task: Runnable, val submissionTime: Long, val mode: TaskMode)
44+
internal data class Task(
45+
val block: Runnable,
46+
val submissionTime: Long,
47+
val mode: TaskMode
48+
) : LockFreeMPMCQueueNode<Task>()
49+
50+
// Open for tests
51+
internal open class GlobalQueue : LockFreeMPMCQueue<Task>() {
52+
// Open for tests
53+
public open fun removeFirstBlockingModeOrNull(): Task? =
54+
removeFistOrNullIf { it.mode == TaskMode.PROBABLY_BLOCKING }
55+
}
4656

4757
internal abstract class TimeSource {
4858
abstract fun nanoTime(): Long

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,25 @@ internal const val BUFFER_CAPACITY = 1 shl BUFFER_CAPACITY_BASE
88
internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
99

1010
/**
11-
* Unstable API and subject to change.
1211
* Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity.
13-
* At any moment queue is used only by [CoroutineScheduler.PoolWorker] threads, has only one producer (worker owning this queue)
12+
* At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue)
1413
* and any amount of consumers, other pool workers which are trying to steal work.
1514
*
16-
* Fairness
15+
* ### Fairness
16+
*
1717
* [WorkQueue] provides semi-FIFO order, but with priority for most recently submitted task assuming
1818
* that these two (current one and submitted) are communicating and sharing state thus making such communication extremely fast.
1919
* E.g. submitted jobs [1, 2, 3, 4] will be executed in [4, 1, 2, 3] order.
2020
*
21-
* Work offloading
21+
* ### Work offloading
22+
*
2223
* When the queue is full, half of existing tasks are offloaded to global queue which is regularly polled by other pool workers.
2324
* Offloading occurs in LIFO order for the sake of implementation simplicity: offloads should be extremely rare and occurs only in specific use-cases
2425
* (e.g. when coroutine starts heavy fork-join-like computation), so fairness is not important.
25-
* As an alternative, offloading directly to some [CoroutineScheduler.PoolWorker] may be used, but then the strategy of selecting any idle worker
26+
* As an alternative, offloading directly to some [CoroutineScheduler.Worker] may be used, but then the strategy of selecting any idle worker
2627
* should be implemented and implementation should be aware multiple producers.
28+
*
29+
* @suppress **This is unstable API and it is subject to change.**
2730
*/
2831
internal class WorkQueue {
2932

@@ -54,9 +57,8 @@ internal class WorkQueue {
5457
* Retrieves and removes task from the head of the queue
5558
* Invariant: this method is called only by the owner of the queue ([pollExternal] is not)
5659
*/
57-
fun poll(): Task? {
58-
return lastScheduledTask.getAndSet(null) ?: pollExternal()
59-
}
60+
fun poll(): Task? =
61+
lastScheduledTask.getAndSet(null) ?: pollExternal()
6062

6163
/**
6264
* Invariant: this method is called only by the owner of the queue
@@ -70,20 +72,18 @@ internal class WorkQueue {
7072
return addLast(previous, globalQueue)
7173
}
7274

73-
// Called only by the owner
75+
// Called only by the owner, returns true if no offloading happened, false otherwise
7476
fun addLast(task: Task, globalQueue: GlobalQueue): Boolean {
75-
var addedToGlobalQueue = false
76-
77+
var noOffloadingHappened = true
7778
/*
7879
* We need the loop here because race possible not only on full queue,
7980
* but also on queue with one element during stealing
8081
*/
8182
while (!tryAddLast(task)) {
8283
offloadWork(globalQueue)
83-
addedToGlobalQueue = true
84+
noOffloadingHappened = false
8485
}
85-
86-
return !addedToGlobalQueue
86+
return noOffloadingHappened
8787
}
8888

8989
/**
@@ -135,7 +135,7 @@ internal class WorkQueue {
135135
private fun offloadWork(target: GlobalQueue) {
136136
repeat((bufferSize / 2).coerceAtLeast(1)) {
137137
val task = pollExternal() ?: return
138-
target.add(task)
138+
target.addLast(task)
139139
}
140140
}
141141

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.experimental.internal
6+
7+
import kotlinx.coroutines.experimental.*
8+
import org.junit.Test
9+
import kotlin.test.*
10+
11+
class LockFreeMPMCQueueTest : TestBase() {
12+
@Test
13+
fun testBasic() {
14+
val q = LockFreeMPMCQueue<Node>()
15+
assertEquals(null, q.removeFistOrNull())
16+
assertTrue(q.isEmpty())
17+
q.addLast(Node(1))
18+
assertEquals(1, q.size)
19+
assertEquals(Node(1), q.removeFistOrNull())
20+
assertEquals(null, q.removeFistOrNull())
21+
assertTrue(q.isEmpty())
22+
}
23+
24+
private data class Node(val v: Int) : LockFreeMPMCQueueNode<Node>()
25+
}

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineDispatcherTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class CoroutineDispatcherTest : SchedulerTestBase() {
1818
fun testSingleThread() = runBlocking {
1919
expect(1)
2020
withContext(dispatcher) {
21-
require(Thread.currentThread() is CoroutineScheduler.PoolWorker)
21+
require(Thread.currentThread() is CoroutineScheduler.Worker)
2222
expect(2)
2323
val job = async(coroutineContext) {
2424
expect(3)

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ class CoroutineSchedulerTest : TestBase() {
8686

8787
@Test
8888
fun testRngUniformDistribution() {
89-
CoroutineScheduler(1).use { scheduler ->
90-
val worker = scheduler.PoolWorker(1)
89+
CoroutineScheduler(1, 128).use { scheduler ->
90+
val worker = scheduler.Worker(1)
9191
testUniformDistribution(worker, 2)
9292
testUniformDistribution(worker, 4)
9393
testUniformDistribution(worker, 8)
@@ -111,7 +111,7 @@ class CoroutineSchedulerTest : TestBase() {
111111
ExperimentalCoroutineDispatcher(4, 1)
112112
}
113113

114-
private fun testUniformDistribution(worker: CoroutineScheduler.PoolWorker, bound: Int) {
114+
private fun testUniformDistribution(worker: CoroutineScheduler.Worker, bound: Int) {
115115
val result = IntArray(bound)
116116
val iterations = 10_000_000
117117
repeat(iterations) {

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/SchedulerTestBase.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,22 @@ abstract class SchedulerTestBase : TestBase() {
3232
* Asserts that any number of pool worker threads in [range] exists at the time of method invocation
3333
*/
3434
fun checkPoolThreadsExist(range: IntRange) {
35-
val threads = Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.PoolWorker }.count()
35+
val threads = Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.Worker }.count()
3636
require(threads in range) { "Expected threads in $range interval, but has $threads" }
3737
}
3838

3939
/**
4040
* Asserts that [expectedThreadsCount] of pool worker threads exists at the time of method invocation
4141
*/
4242
fun checkPoolThreadsExist(expectedThreadsCount: Int = CORES_COUNT) {
43-
val threads = Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.PoolWorker }.count()
43+
val threads = Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.Worker }.count()
4444
require(threads == expectedThreadsCount) { "Expected $expectedThreadsCount threads, but has $threads" }
4545
}
4646

4747
fun initialPoolSize() = Runtime.getRuntime().availableProcessors().coerceAtMost(2)
4848

4949
private fun maxSequenceNumber(): Int? {
50-
return Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.PoolWorker }
50+
return Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.Worker }
5151
.map { sequenceNumber(it.name) }.max()
5252
}
5353

0 commit comments

Comments
 (0)