@@ -125,9 +125,43 @@ public suspend fun Job.join() {
125125 */
126126@Suppress(" LeakingThis" )
127127public open class JobSupport : AbstractCoroutineContextElement (Job ), Job {
128- // keeps a stack of cancel listeners or a special CANCELLED, other values denote completed scope
128+ /*
129+ === States ===
130+ name state class is Active?
131+ ------ ------------ ---------
132+ EMPTY Empty : Active -- no completion listener
133+ SINGLE JobNode : Active -- a single completion listener
134+ SINGLE+ JobNode : Active -- a single completion listener + NodeList added as its next
135+ LIST NodeList : Active -- a list of listeners (promoted just once, does not got back to JobNode/Empty)
136+ FINAL_C Cancelled : !Active -- cancelled (final state)
137+ FINAL_F Failed : !Active -- failed for other reason (final state)
138+ FINAL_R <any> : !Active -- produced some result
139+
140+ === Transitions ===
141+
142+ Active states !Active states
143+ +---------+ +----------+
144+ initial -+-> | EMPTY | ------------> | FINAL_* |
145+ | +---------+ +----------+
146+ | | ^ ^
147+ | V | |
148+ | +---------+ |
149+ | | SINGLE | --------------------+
150+ | +---------+ |
151+ | | |
152+ | V |
153+ | +---------+ |
154+ +-- | SINGLE+ | --------------------+
155+ +---------+ |
156+ | |
157+ V |
158+ +---------+ |
159+ | LIST | --------------------+
160+ +---------+
161+ */
162+
129163 @Volatile
130- private var state: Any? = ActiveList () // will drop the list on cancel
164+ private var state: Any? = Empty // shared object while we have no listeners
131165
132166 @Volatile
133167 private var registration: Job .Registration ? = null
@@ -138,7 +172,10 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
138172 AtomicReferenceFieldUpdater .newUpdater(JobSupport ::class .java, Any ::class .java, " state" )
139173 }
140174
141- // invoke at most once after construction after all other initialization
175+ /* *
176+ * Initializes parent job.
177+ * It shall be invoked at most once after construction after all other initialization.
178+ */
142179 public fun initParentJob (parent : Job ? ) {
143180 if (parent == null ) return
144181 check(registration == null )
@@ -149,11 +186,16 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
149186 if (state !is Active ) newRegistration.unregister()
150187 }
151188
152- protected fun getState (): Any? = state
189+ /* *
190+ * Returns current state of this job.
191+ */
192+ internal fun getState (): Any? = state
153193
194+ /* *
195+ * Tries to update current [state][getState] of this job.
196+ */
154197 protected fun updateState (expect : Any , update : Any? ): Boolean {
155- expect as ActiveList // assert type
156- require(update !is Active ) // only active -> inactive transition is allowed
198+ require(expect is Active && update !is Active ) // only active -> inactive transition is allowed
157199 if (! STATE .compareAndSet(this , expect, update)) return false
158200 // #1. Update linked state before invoking completion handlers
159201 onStateUpdate(update)
@@ -162,36 +204,93 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
162204 // #3. Invoke completion handlers
163205 val reason = (update as ? CompletedExceptionally )?.cancelReason
164206 var completionException: Throwable ? = null
165- expect.forEach<JobNode > { node ->
166- try {
167- node.invoke(reason)
207+ when (expect) {
208+ // SINGLE/SINGLE+ state -- one completion handler (common case)
209+ is JobNode -> try {
210+ expect.invoke(reason)
168211 } catch (ex: Throwable ) {
169- completionException?.apply { addSuppressed(ex) } ? : run { completionException = ex }
212+ completionException = ex
213+ }
214+ // LIST state -- a list of completion handlers
215+ is NodeList -> expect.forEach<JobNode > { node ->
216+ try {
217+ node.invoke(reason)
218+ } catch (ex: Throwable ) {
219+ completionException?.apply { addSuppressed(ex) } ? : run { completionException = ex }
220+ }
221+
170222 }
223+ // otherwise -- do nothing (Empty)
224+ else -> check(expect == Empty )
171225 }
172226 // #4. Do other (overridable) processing after completion handlers
173227 completionException?.let { handleCompletionException(it) }
174228 afterCompletion(update)
175229 return true
176230 }
177231
178- public override val isActive: Boolean get() = state is Active
232+ public final override val isActive: Boolean get() = state is Active
179233
180- public override fun onCompletion (handler : CompletionHandler ): Job .Registration {
234+ public final override fun onCompletion (handler : CompletionHandler ): Job .Registration {
181235 var nodeCache: JobNode ? = null
182236 while (true ) { // lock-free loop on state
183237 val state = this .state
184- if (state !is Active ) {
185- handler((state as ? Cancelled )?.cancelReason)
186- return EmptyRegistration
238+ when {
239+ // EMPTY state -- no completion handlers
240+ state == = Empty -> {
241+ // try move to SINGLE state
242+ val node = nodeCache ? : makeNode(handler).also { nodeCache = it }
243+ if (STATE .compareAndSet(this , state, node)) return node
244+ }
245+ // SINGLE/SINGLE+ state -- one completion handler
246+ state is JobNode -> {
247+ // try promote it to the list (SINGLE+ state)
248+ state.addIfEmpty(NodeList ())
249+ // it must be in SINGLE+ state or state has changed (node could have need removed from state)
250+ val list = state.next() // either NodeList or somebody else won the race, updated state
251+ // just attempt converting it to list if state is still the same, then continue lock-free loop
252+ STATE .compareAndSet(this , state, list)
253+ }
254+ // LIST -- a list of completion handlers
255+ state is NodeList -> {
256+ val node = nodeCache ? : makeNode(handler).also { nodeCache = it }
257+ if (state.addLastIf(node) { this .state == state }) return node
258+ }
259+ // is not active anymore
260+ else -> {
261+ handler((state as ? Cancelled )?.cancelReason)
262+ return EmptyRegistration
263+ }
264+ }
265+ }
266+ }
267+
268+ internal fun removeNode (node : JobNode ) {
269+ // remove logic depends on the state of the job
270+ while (true ) { // lock-free loop on job state
271+ val state = this .state
272+ when {
273+ // EMPTY state -- no completion handlers
274+ state == = Empty -> return
275+ // SINGE/SINGLE+ state -- one completion handler
276+ state is JobNode -> {
277+ if (state != = this ) return // a different job node --> we were already removed
278+ // try remove and revert back to empty state
279+ if (STATE .compareAndSet(this , state, Empty )) return
280+ }
281+ // LIST -- a list of completion handlers
282+ state is NodeList -> {
283+ // remove node from the list
284+ node.remove()
285+ return
286+ }
287+ // is not active anymore
288+ else -> return
187289 }
188- val node = nodeCache ? : makeNode(handler).apply { nodeCache = this }
189- state as ActiveList // assert type
190- if (state.addLastIf(node) { this .state == state }) return node
191290 }
192291 }
193292
194- public override fun cancel (reason : Throwable ? ): Boolean {
293+ public final override fun cancel (reason : Throwable ? ): Boolean {
195294 while (true ) { // lock-free loop on state
196295 val state = this .state as ? Active ? : return false // quit if not active anymore
197296 if (updateState(state, Cancelled (reason))) return true
@@ -219,16 +318,27 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
219318 (handler as ? JobNode )?.also { require(it.job == = this ) }
220319 ? : InvokeOnCompletion (this , handler)
221320
222- protected interface Active
321+ /* *
322+ * Marker interface for active [state][getState] of a job.
323+ */
324+ public interface Active
223325
224- private class ActiveList : LockFreeLinkedListHead (), Active
326+ private object Empty : Active
225327
226- protected abstract class CompletedExceptionally {
328+ private class NodeList : LockFreeLinkedListHead (), Active
329+
330+ /* *
331+ * Abstract class for a [state][getState] of a job that had completed exceptionally, including cancellation.
332+ */
333+ public abstract class CompletedExceptionally {
227334 abstract val cancelReason: Throwable // original reason or fresh CancellationException
228335 abstract val exception: Throwable // the exception to be thrown in continuation
229336 }
230337
231- protected class Cancelled (specifiedReason : Throwable ? ) : CompletedExceptionally() {
338+ /* *
339+ * Represents a [state][getState] of a cancelled job.
340+ */
341+ public class Cancelled (specifiedReason : Throwable ? ) : CompletedExceptionally() {
232342 @Volatile
233343 private var _cancelReason = specifiedReason // materialize CancellationException on first need
234344
@@ -246,20 +356,21 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
246356 .also { _exception = it }
247357 }
248358
249- protected class Failed (override val exception : Throwable ) : CompletedExceptionally() {
359+ /* *
360+ * Represents a [state][getState] of a failed job.
361+ */
362+ public class Failed (override val exception : Throwable ) : CompletedExceptionally() {
250363 override val cancelReason: Throwable
251364 get() = exception
252365 }
253366}
254367
255368internal abstract class JobNode (
256369 val job : Job
257- ) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler {
258- override fun unregister () {
259- // this is an object-allocation optimization -- do not remove if job is not active anymore
260- if (job.isActive) remove()
261- }
262-
370+ ) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler, JobSupport.Active {
371+ // if unregister is called on this instance, then Job was an instance of JobSupport that added this node it itself
372+ // directly without wrapping
373+ final override fun unregister () = (job as JobSupport ).removeNode(this )
263374 override abstract fun invoke (reason : Throwable ? )
264375}
265376
0 commit comments