Skip to content

Commit c22a1c7

Browse files
committed
LockFreeMPSCQueue optimized & fixed
* no need for REMOVE token to hurry up GC * Placeholder logic is fixed to avoid potential data loss on wraparound * Stress test for resize/copy added
1 parent 7c99a99 commit c22a1c7

File tree

2 files changed

+129
-17
lines changed

2 files changed

+129
-17
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeMPSCQueue.kt

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,10 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
122122
}
123123

124124
private fun fillPlaceholder(index: Int, element: E): Core<E>? {
125-
if (array.compareAndSet(index and mask, PLACEHOLDER, element)) {
125+
val old = array.get(index and mask)
126+
if (old is Placeholder && old.index == index) {
127+
// that is OUR placeholder and only we can fill it in
128+
array.set(index and mask, element)
126129
// we've corrected missing element, should check if that propagated to further copies, just in case
127130
return this
128131
}
@@ -137,15 +140,10 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
137140
if (state and FROZEN_MASK != 0L) return REMOVE_FROZEN // frozen -- cannot modify
138141
state.withState { head, tail ->
139142
if ((tail and mask) == (head and mask)) return null // empty
140-
// because queue is Single Consumer, then element == null|PLACEHOLDER can only be when add has not finished yet
143+
// because queue is Single Consumer, then element == null|Placeholder can only be when add has not finished yet
141144
val element = array[head and mask] ?: return null
142-
if (element === PLACEHOLDER) return null // same story -- consider it not added yet
143-
check(element !== REMOVED) { "This queue can have only one consumer" }
144-
// tentatively remove element to let GC work
145-
// we cannot put null into array, because copying thread could replace it with PLACEHOLDER
146-
// and that is a disaster, so a separate REMOVED token is used here.
147-
// Note: at most one REMOVED in the array, because single consumer.
148-
array[head and mask] = REMOVED
145+
if (element is Placeholder) return null // same story -- consider it not added yet
146+
// we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
149147
val newHead = (head + 1) and MAX_CAPACITY_MASK
150148
if (_state.compareAndSet(state, state.updateHead(newHead))) {
151149
array[head and mask] = null // now can safely put null (state was updated)
@@ -165,10 +163,8 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
165163
state.withState { head, _ ->
166164
check(head == oldHead) { "This queue can have only one consumer" }
167165
if (state and FROZEN_MASK != 0L) {
168-
// state was already frozen, so either old or REMOVED item could have been copied to next
169-
val next = next()
170-
next.array[head and next.mask] = REMOVED // make sure it is removed in new array regardless
171-
return next // continue to correct head in next
166+
// state was already frozen, so removed element was copied to next
167+
return next() // continue to correct head in next
172168
}
173169
if (_state.compareAndSet(state, state.updateHead(newHead))) {
174170
array[head and mask] = null // now can safely put null (state was updated)
@@ -199,14 +195,19 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
199195
var index = head
200196
while (index and mask != tail and mask) {
201197
// replace nulls with placeholders on copy
202-
next.array[index and next.mask] = array[index and mask] ?: PLACEHOLDER
198+
next.array[index and next.mask] = array[index and mask] ?: Placeholder(index)
203199
index++
204200
}
205201
next._state.value = state wo FROZEN_MASK
206202
}
207203
return next
208204
}
209205

206+
// Instance of this class is placed into array when we have to copy array, but addLast is in progress --
207+
// it had already reserved a slot in the array (with null) and have not yet put its value there.
208+
// Placeholder keeps the actual index (not masked) to distinguish placeholders on different wraparounds of array
209+
private class Placeholder(@JvmField val index: Int)
210+
210211
@Suppress("PrivatePropertyName")
211212
internal companion object {
212213
internal const val INITIAL_CAPACITY = 8
@@ -229,9 +230,6 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
229230
internal const val ADD_FROZEN = 1
230231
internal const val ADD_CLOSED = 2
231232

232-
private val PLACEHOLDER = Symbol("PLACEHOLDER")
233-
private val REMOVED = Symbol("PLACEHOLDER")
234-
235233
private infix fun Long.wo(other: Long) = this and other.inv()
236234
private fun Long.updateHead(newHead: Int) = (this wo HEAD_MASK) or (newHead.toLong() shl HEAD_SHIFT)
237235
private fun Long.updateTail(newTail: Int) = (this wo TAIL_MASK) or (newTail.toLong() shl TAIL_SHIFT)
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.internal
18+
19+
import kotlinx.atomicfu.*
20+
import kotlinx.coroutines.experimental.*
21+
import java.util.concurrent.*
22+
import kotlin.concurrent.*
23+
import kotlin.test.*
24+
25+
// Tests many short queues to stress copy/resize
26+
class LockFreeMPSCQueueStressTest : TestBase() {
27+
private val nSeconds = 3 * stressTestMultiplier
28+
private val nProducers = 4
29+
private val batchSize = 100
30+
31+
private val batch = atomic(0)
32+
private val produced = atomic(0L)
33+
private val consumed = atomic(0L)
34+
private var expected = LongArray(nProducers)
35+
36+
private var queue = atomic<LockFreeMPSCQueue<Item>?>(null)
37+
private val done = atomic(0)
38+
private val doneProducers = atomic(0)
39+
40+
private val barrier = CyclicBarrier(nProducers + 2)
41+
42+
private class Item(val producer: Int, val index: Long)
43+
44+
@Test
45+
fun testStress() {
46+
val threads = mutableListOf<Thread>()
47+
threads += thread(name = "Pacer", start = false) {
48+
while (done.value == 0) {
49+
queue.value = LockFreeMPSCQueue()
50+
batch.value = 0
51+
doneProducers.value = 0
52+
barrier.await() // start consumers & producers
53+
barrier.await() // await consumers & producers
54+
}
55+
queue.value = null
56+
println("Pacer done")
57+
barrier.await() // wakeup the rest
58+
}
59+
threads += thread(name = "Consumer", start = false) {
60+
while (true) {
61+
barrier.await()
62+
val queue = queue.value ?: break
63+
while (true) {
64+
val item = queue.removeFirstOrNull()
65+
if (item == null) {
66+
if (doneProducers.value == nProducers && queue.isEmpty) break // that's it
67+
continue // spin to retry
68+
}
69+
consumed.incrementAndGet()
70+
val eItem = expected[item.producer]++
71+
if (eItem != item.index) error("Expected $eItem but got ${item.index} from Producer-${item.producer}")
72+
}
73+
barrier.await()
74+
}
75+
println("Consumer done")
76+
}
77+
val producers = List(nProducers) { producer ->
78+
thread(name = "Producer-$producer", start = false) {
79+
var index = 0L
80+
while (true) {
81+
barrier.await()
82+
val queue = queue.value ?: break
83+
while (true) {
84+
if (batch.incrementAndGet() >= batchSize) break
85+
check(queue.addLast(Item(producer, index++))) // never closed
86+
produced.incrementAndGet()
87+
}
88+
doneProducers.incrementAndGet()
89+
barrier.await()
90+
}
91+
println("Producer-$producer done")
92+
}
93+
}
94+
threads += producers
95+
threads.forEach {
96+
it.setUncaughtExceptionHandler { t, e ->
97+
System.err.println("Thread $t failed: $e")
98+
e.printStackTrace()
99+
done.value = 1
100+
error("Thread $t failed", e)
101+
}
102+
}
103+
threads.forEach { it.start() }
104+
for (second in 1..nSeconds) {
105+
Thread.sleep(1000)
106+
println("$second: produced=${produced.value}, consumed=${consumed.value}")
107+
if (done.value == 1) break
108+
}
109+
done.value = 1
110+
threads.forEach { it.join() }
111+
println("T: produced=${produced.value}, consumed=${consumed.value}")
112+
assertEquals(produced.value, consumed.value)
113+
}
114+
}

0 commit comments

Comments
 (0)