1+ /*
2+ * Copyright 2016-2017 JetBrains s.r.o.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package kotlinx.coroutines.experimental
18+
19+ import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
20+ import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
21+ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
22+
23+ /* *
24+ * Mutual exclusion for coroutines.
25+ *
26+ * Mutex has two states: _locked_ and _unlocked_.
27+ * It is **non-reentrant**, that is invoking [lock] even from the same thread/coroutine that currently holds
28+ * the lock still suspends the invoker.
29+ *
30+ * @param locked initial state of the mutex
31+ */
32+ public class Mutex (locked : Boolean = false ) {
33+ // State is: Empty | UnlockOp | LockFreeLinkedListHead (queue of Waiter objects)
34+ @Volatile
35+ private var state: Any? = if (locked) EmptyLocked else EmptyUnlocked // shared objects while we have no waiters
36+
37+ private companion object {
38+ @JvmStatic
39+ private val STATE : AtomicReferenceFieldUpdater <Mutex , Any ?> =
40+ AtomicReferenceFieldUpdater .newUpdater(Mutex ::class .java, Any ::class .java, " state" )
41+
42+ @JvmStatic
43+ private val EmptyLocked = Empty (true )
44+
45+ @JvmStatic
46+ private val EmptyUnlocked = Empty (false )
47+ }
48+
49+ /* *
50+ * Tries to lock this mutex, returning `false` if this mutex is already locked.
51+ */
52+ public fun tryLock (): Boolean {
53+ while (true ) { // lock-free loop on state
54+ val state = this .state
55+ when (state) {
56+ is Empty -> {
57+ if (state.locked) return false
58+ if (STATE .compareAndSet(this , state, EmptyLocked )) return true
59+ }
60+ is UnlockOp -> state.helpComplete() // help
61+ else -> return false
62+ }
63+ }
64+ }
65+
66+ /* *
67+ * Locks this mutex, suspending caller while the mutex is locked.
68+ *
69+ * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
70+ * function is suspended, this function immediately resumes with [CancellationException].
71+ * Cancellation of suspended lock invocation is *atomic* -- when this function
72+ * throws [CancellationException] it means that the mutex was not locked.
73+ *
74+ * Note, that this function does not check for cancellation when it is not suspended.
75+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
76+ */
77+ public suspend fun lock () {
78+ // fast-path -- try lock
79+ if (tryLock()) return
80+ // slow-path -- suspend
81+ return lockSuspend()
82+ }
83+
84+ private suspend fun lockSuspend () = suspendCancellableCoroutine<Unit >(holdCancellability = true ) sc@ { cont ->
85+ val waiter = Waiter (cont)
86+ loop@ while (true ) { // lock-free loop on state
87+ val state = this .state
88+ when (state) {
89+ is Empty -> {
90+ if (state.locked) {
91+ // try upgrade to queue & retry
92+ STATE .compareAndSet(this , state, LockFreeLinkedListHead ())
93+ continue @loop
94+ } else {
95+ // try lock
96+ if (STATE .compareAndSet(this , state, EmptyLocked )) {
97+ // locked
98+ cont.resume(Unit )
99+ return @sc
100+ }
101+ }
102+ }
103+ is UnlockOp -> { // help & retry
104+ state.helpComplete()
105+ continue @loop
106+ }
107+ else -> {
108+ state as LockFreeLinkedListHead // type assertion
109+ if (state.addLastIf(waiter, { this .state == = state })) {
110+ // added to waiter list!
111+ cont.initCancellability()
112+ cont.removeOnCompletion(waiter)
113+ return @sc
114+ }
115+ }
116+ }
117+ }
118+ }
119+
120+ /* *
121+ * Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked.
122+ */
123+ public fun unlock () {
124+ while (true ) { // lock-free loop on state
125+ val state = this .state
126+ when (state) {
127+ is Empty -> {
128+ check(state.locked) { " Mutex is not locked" }
129+ if (STATE .compareAndSet(this , state, EmptyUnlocked )) return
130+ }
131+ is UnlockOp -> state.helpComplete()
132+ else -> {
133+ state as LockFreeLinkedListHead // type assertion
134+ val waiter = state.removeFirstOrNull()
135+ if (waiter == null ) {
136+ val op = UnlockOp (state)
137+ if (STATE .compareAndSet(this , state, op) && op.helpComplete()) return
138+ } else {
139+ val cont = (waiter as Waiter ).cont
140+ val token = cont.tryResume(Unit )
141+ if (token != null ) {
142+ // successfully resumed waiter that now is holding the lock
143+ cont.completeResume(token)
144+ return
145+ }
146+ }
147+ }
148+ }
149+ }
150+ }
151+
152+ private class Empty (val locked : Boolean ) {
153+ override fun toString (): String = " Empty[${if (locked) " Locked" else " Unlocked" } ]" ;
154+ }
155+
156+ private class Waiter (val cont : CancellableContinuation <Unit >) : LockFreeLinkedListNode()
157+
158+ // atomic unlock operation that checks that waiters queue is empty
159+ private inner class UnlockOp (val queue : LockFreeLinkedListHead ) {
160+ fun helpComplete (): Boolean {
161+ val success = queue.isEmpty // Note: queue cannot change anymore (so decision is consistent)
162+ val update: Any = if (success) EmptyUnlocked else queue
163+ STATE .compareAndSet(this @Mutex, this @UnlockOp, update)
164+ return success
165+ }
166+ }
167+ }
0 commit comments