55package kotlinx.coroutines.internal
66
77import kotlinx.atomicfu.*
8+ import java.util.*
89import java.util.concurrent.atomic.*
910
10- private typealias Core <E > = LockFreeMPSCQueueCore <E >
11+ private typealias Core <E > = LockFreeTaskQueueCore <E >
1112
1213/* *
13- * Lock-free Multiply-Producer Single-Consumer Queue.
14- * *Note: This queue is NOT linearizable. It provides only quiescent consistency for its operations.*
14+ * Lock-free Multiply-Producer xxx-Consumer Queue for task scheduling purposes.
1515 *
16+ * **Note 1: This queue is NOT linearizable. It provides only quiescent consistency for its operations.**
17+ * However, this guarantee is strong enough for task-scheduling purposes.
1618 * In particular, the following execution is permitted for this queue, but is not permitted for a linearizable queue:
1719 *
1820 * ```
1921 * Thread 1: addLast(1) = true, removeFirstOrNull() = null
2022 * Thread 2: addLast(2) = 2 // this operation is concurrent with both operations in the first thread
2123 * ```
24+ *
25+ * **Note 2: When this queue is used with multiple consumers (`singleConsumer == false`) this it is NOT lock-free.**
26+ * In particular, consumer spins until producer finishes its operation in the case of near-empty queue.
27+ * It is a very short window that could manifest itself rarely and only under specific load conditions,
28+ * but it still deprives this algorithm of its lock-freedom.
2229 */
23- internal class LockFreeMPSCQueue <E : Any > {
24- private val _cur = atomic(Core <E >(Core .INITIAL_CAPACITY ))
30+ internal open class LockFreeTaskQueue <E : Any >(
31+ singleConsumer : Boolean // true when there is only a single consumer (slightly faster & lock-free)
32+ ) {
33+ private val _cur = atomic(Core <E >(Core .INITIAL_CAPACITY , singleConsumer))
2534
2635 // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
2736 val isEmpty: Boolean get() = _cur .value.isEmpty
37+ val size: Int get() = _cur .value.size
2838
2939 fun close () {
3040 _cur .loop { cur ->
@@ -44,22 +54,29 @@ internal class LockFreeMPSCQueue<E : Any> {
4454 }
4555
4656 @Suppress(" UNCHECKED_CAST" )
47- fun removeFirstOrNull (): E ? {
57+ fun removeFirstOrNull (): E ? = removeFirstOrNullIf { true }
58+
59+ @Suppress(" UNCHECKED_CAST" )
60+ inline fun removeFirstOrNullIf (predicate : (E ) -> Boolean ): E ? {
4861 _cur .loop { cur ->
49- val result = cur.removeFirstOrNull( )
62+ val result = cur.removeFirstOrNullIf(predicate )
5063 if (result != = Core .REMOVE_FROZEN ) return result as E ?
5164 _cur .compareAndSet(cur, cur.next())
5265 }
5366 }
67+
68+ // Used for validation in tests only
69+ fun <R > map (transform : (E ) -> R ): List <R > = _cur .value.map(transform)
5470}
5571
5672/* *
57- * Lock-free Multiply-Producer Single-Consumer Queue core.
58- * *Note: This queue is NOT linearizable. It provides only quiescent consistency for its operations.*
59- *
60- * @see LockFreeMPSCQueue
73+ * Lock-free Multiply-Producer xxx-Consumer Queue core.
74+ * @see LockFreeTaskQueue
6175 */
62- internal class LockFreeMPSCQueueCore <E : Any >(private val capacity : Int ) {
76+ internal class LockFreeTaskQueueCore <E : Any >(
77+ private val capacity : Int ,
78+ private val singleConsumer : Boolean // true when there is only a single consumer (slightly faster)
79+ ) {
6380 private val mask = capacity - 1
6481 private val _next = atomic<Core <E >? > (null )
6582 private val _state = atomic(0L )
@@ -72,6 +89,7 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
7289
7390 // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
7491 val isEmpty: Boolean get() = _state .value.withState { head, tail -> head == tail }
92+ val size: Int get() = _state .value.withState { head, tail -> (tail - head) and MAX_CAPACITY_MASK }
7593
7694 fun close (): Boolean {
7795 _state .update { state ->
@@ -87,9 +105,23 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
87105 _state .loop { state ->
88106 if (state and (FROZEN_MASK or CLOSED_MASK ) != 0L ) return state.addFailReason() // cannot add
89107 state.withState { head, tail ->
90- // there could be one REMOVE element beyond head that we cannot stump up ,
108+ // If queue is Single-Consumer than there could be one element beyond head that we cannot overwrite ,
91109 // so we check for full queue with an extra margin of one element
92110 if ((tail + 2 ) and mask == head and mask) return ADD_FROZEN // overfull, so do freeze & copy
111+ // If queue is Multi-Consumer then the consumer could still have not cleared element
112+ // despite the above check for one free slot.
113+ if (! singleConsumer && array[tail and mask] != null ) {
114+ // There are two options in this situation
115+ // 1. Spin-wait until consumer clears the slot
116+ // 2. Freeze & resize to avoid spinning
117+ // We use heuristic here to avoid memory-overallocation
118+ // Freeze & reallocate when queue is small or more than half of the queue is used
119+ if (capacity < MIN_ADD_SPIN_CAPACITY || (tail - head) and MAX_CAPACITY_MASK > capacity shr 1 ) {
120+ return ADD_FROZEN
121+ }
122+ // otherwise spin
123+ return @loop
124+ }
93125 val newTail = (tail + 1 ) and MAX_CAPACITY_MASK
94126 if (_state .compareAndSet(state, state.updateTail(newTail))) {
95127 // successfully added
@@ -127,23 +159,38 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
127159 return null
128160 }
129161
130- // SINGLE CONSUMER
131162 // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
132- fun removeFirstOrNull (): Any? {
163+ fun removeFirstOrNull (): Any? = removeFirstOrNullIf { true }
164+
165+ // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
166+ inline fun removeFirstOrNullIf (predicate : (E ) -> Boolean ): Any? {
133167 _state .loop { state ->
134168 if (state and FROZEN_MASK != 0L ) return REMOVE_FROZEN // frozen -- cannot modify
135169 state.withState { head, tail ->
136170 if ((tail and mask) == (head and mask)) return null // empty
137- // because queue is Single Consumer, then element == null|Placeholder can only be when add has not finished yet
138- val element = array[head and mask] ? : return null
139- if (element is Placeholder ) return null // same story -- consider it not added yet
171+ val element = array[head and mask]
172+ if (element == null ) {
173+ // If queue is Single-Consumer, then element == null only when add has not finished yet
174+ if (singleConsumer) return null // consider it not added yet
175+ // retry (spin) until consumer adds it
176+ return @loop
177+ }
178+ // element == Placeholder can only be when add has not finished yet
179+ if (element is Placeholder ) return null // consider it not added yet
180+ // now we tentative know element to remove -- check predicate
181+ @Suppress(" UNCHECKED_CAST" )
182+ if (! predicate(element as E )) return null
140183 // we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
141184 val newHead = (head + 1 ) and MAX_CAPACITY_MASK
142185 if (_state .compareAndSet(state, state.updateHead(newHead))) {
186+ // Array could have been copied by another thread and it is perfectly fine, since only elements
187+ // between head and tail were copied and there are no extra steps we should take here
143188 array[head and mask] = null // now can safely put null (state was updated)
144189 return element // successfully removed in fast-path
145190 }
146- // Slow-path for remove in case of interference
191+ // Multi-Consumer queue must retry this loop on CAS failure (another consumer might have removed element)
192+ if (! singleConsumer) return @loop
193+ // Single-consumer queue goes to slow-path for remove in case of interference
147194 var cur = this
148195 while (true ) {
149196 @Suppress(" UNUSED_VALUE" )
@@ -169,7 +216,7 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
169216 }
170217 }
171218
172- fun next (): LockFreeMPSCQueueCore <E > = allocateOrGetNextCopy(markFrozen())
219+ fun next (): LockFreeTaskQueueCore <E > = allocateOrGetNextCopy(markFrozen())
173220
174221 private fun markFrozen (): Long =
175222 _state .updateAndGet { state ->
@@ -185,7 +232,7 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
185232 }
186233
187234 private fun allocateNextCopy (state : Long ): Core <E > {
188- val next = LockFreeMPSCQueueCore <E >(capacity * 2 )
235+ val next = LockFreeTaskQueueCore <E >(capacity * 2 , singleConsumer )
189236 state.withState { head, tail ->
190237 var index = head
191238 while (index and mask != tail and mask) {
@@ -198,44 +245,64 @@ internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
198245 return next
199246 }
200247
248+ // Used for validation in tests only
249+ fun <R > map (transform : (E ) -> R ): List <R > {
250+ val res = ArrayList <R >(array.length())
251+ _state .value.withState { head, tail ->
252+ var index = head
253+ while (index and mask != tail and mask) {
254+ // replace nulls with placeholders on copy
255+ val element = array[index and mask]
256+ @Suppress(" UNCHECKED_CAST" )
257+ if (element != null && element !is Placeholder ) res.add(transform(element as E ))
258+ index++
259+ }
260+ }
261+ return res
262+ }
263+
264+
201265 // Instance of this class is placed into array when we have to copy array, but addLast is in progress --
202266 // it had already reserved a slot in the array (with null) and have not yet put its value there.
203267 // Placeholder keeps the actual index (not masked) to distinguish placeholders on different wraparounds of array
204- private class Placeholder (@JvmField val index : Int )
268+ // Internal because of inlining
269+ internal class Placeholder (@JvmField val index : Int )
205270
206- @Suppress(" PrivatePropertyName" )
271+ @Suppress(" PrivatePropertyName" , " MemberVisibilityCanBePrivate " )
207272 internal companion object {
208- internal const val INITIAL_CAPACITY = 8
273+ const val INITIAL_CAPACITY = 8
274+
275+ const val CAPACITY_BITS = 30
276+ const val MAX_CAPACITY_MASK = (1 shl CAPACITY_BITS ) - 1
277+ const val HEAD_SHIFT = 0
278+ const val HEAD_MASK = MAX_CAPACITY_MASK .toLong() shl HEAD_SHIFT
279+ const val TAIL_SHIFT = HEAD_SHIFT + CAPACITY_BITS
280+ const val TAIL_MASK = MAX_CAPACITY_MASK .toLong() shl TAIL_SHIFT
209281
210- private const val CAPACITY_BITS = 30
211- private const val MAX_CAPACITY_MASK = (1 shl CAPACITY_BITS ) - 1
212- private const val HEAD_SHIFT = 0
213- private const val HEAD_MASK = MAX_CAPACITY_MASK .toLong() shl HEAD_SHIFT
214- private const val TAIL_SHIFT = HEAD_SHIFT + CAPACITY_BITS
215- private const val TAIL_MASK = MAX_CAPACITY_MASK .toLong() shl TAIL_SHIFT
282+ const val FROZEN_SHIFT = TAIL_SHIFT + CAPACITY_BITS
283+ const val FROZEN_MASK = 1L shl FROZEN_SHIFT
284+ const val CLOSED_SHIFT = FROZEN_SHIFT + 1
285+ const val CLOSED_MASK = 1L shl CLOSED_SHIFT
216286
217- private const val FROZEN_SHIFT = TAIL_SHIFT + CAPACITY_BITS
218- private const val FROZEN_MASK = 1L shl FROZEN_SHIFT
219- private const val CLOSED_SHIFT = FROZEN_SHIFT + 1
220- private const val CLOSED_MASK = 1L shl CLOSED_SHIFT
287+ const val MIN_ADD_SPIN_CAPACITY = 1024
221288
222- @JvmField internal val REMOVE_FROZEN = Symbol (" REMOVE_FROZEN" )
289+ @JvmField val REMOVE_FROZEN = Symbol (" REMOVE_FROZEN" )
223290
224- internal const val ADD_SUCCESS = 0
225- internal const val ADD_FROZEN = 1
226- internal const val ADD_CLOSED = 2
291+ const val ADD_SUCCESS = 0
292+ const val ADD_FROZEN = 1
293+ const val ADD_CLOSED = 2
227294
228- private infix fun Long.wo (other : Long ) = this and other.inv ()
229- private fun Long.updateHead (newHead : Int ) = (this wo HEAD_MASK ) or (newHead.toLong() shl HEAD_SHIFT )
230- private fun Long.updateTail (newTail : Int ) = (this wo TAIL_MASK ) or (newTail.toLong() shl TAIL_SHIFT )
295+ infix fun Long.wo (other : Long ) = this and other.inv ()
296+ fun Long.updateHead (newHead : Int ) = (this wo HEAD_MASK ) or (newHead.toLong() shl HEAD_SHIFT )
297+ fun Long.updateTail (newTail : Int ) = (this wo TAIL_MASK ) or (newTail.toLong() shl TAIL_SHIFT )
231298
232- private inline fun <T > Long.withState (block : (head: Int , tail: Int ) -> T ): T {
299+ inline fun <T > Long.withState (block : (head: Int , tail: Int ) -> T ): T {
233300 val head = ((this and HEAD_MASK ) shr HEAD_SHIFT ).toInt()
234301 val tail = ((this and TAIL_MASK ) shr TAIL_SHIFT ).toInt()
235302 return block(head, tail)
236303 }
237304
238305 // FROZEN | CLOSED
239- private fun Long.addFailReason (): Int = if (this and CLOSED_MASK != 0L ) ADD_CLOSED else ADD_FROZEN
306+ fun Long.addFailReason (): Int = if (this and CLOSED_MASK != 0L ) ADD_CLOSED else ADD_FROZEN
240307 }
241308}
0 commit comments