1616
1717package kotlinx.coroutines.experimental
1818
19- import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
20- import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
19+ import kotlinx.coroutines.experimental.internal.*
2120import java.util.concurrent.Future
22- import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
2321import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
2422import kotlin.coroutines.experimental.AbstractCoroutineContextElement
2523import kotlin.coroutines.experimental.Continuation
@@ -260,20 +258,22 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
260258
261259 This state machine and its transition matrix are optimized for the common case when job is created in active
262260 state (EMPTY_A) and at most one completion listener is added to it during its life-time.
261+
262+ Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
263263 */
264264
265265 @Volatile
266- private var state : Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
266+ private var _state : Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
267267
268268 @Volatile
269269 private var registration: Job .Registration ? = null
270270
271271 protected companion object {
272272 @JvmStatic
273273 private val STATE : AtomicReferenceFieldUpdater <JobSupport , Any ?> =
274- AtomicReferenceFieldUpdater .newUpdater(JobSupport ::class .java, Any ::class .java, " state " )
274+ AtomicReferenceFieldUpdater .newUpdater(JobSupport ::class .java, Any ::class .java, " _state " )
275275
276- fun describeState (state : Any? ): String =
276+ fun stateToString (state : Any? ): String =
277277 if (state is Incomplete )
278278 if (state.isActive) " Active" else " New"
279279 else " Completed"
@@ -299,10 +299,16 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
299299 /* *
300300 * Returns current state of this job.
301301 */
302- protected fun getState (): Any? = state
302+ protected val state: Any? get() {
303+ while (true ) { // lock-free helping loop
304+ val state = _state
305+ if (state !is OpDescriptor ) return state
306+ state.perform(this )
307+ }
308+ }
303309
304310 /* *
305- * Tries to update current [state][getState] of this job.
311+ * Tries to update current [state] of this job.
306312 */
307313 internal fun updateState (expect : Any , update : Any? ): Boolean {
308314 if (! tryUpdateState(expect, update)) return false
@@ -347,14 +353,20 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
347353 afterCompletion(update)
348354 }
349355
350- final override val isActive: Boolean get() {
356+ public final override val isActive: Boolean get() {
351357 val state = this .state
352358 return state is Incomplete && state.isActive
353359 }
354360
355- final override val isCompleted: Boolean get() = state !is Incomplete
361+ public final override val isCompleted: Boolean get() = state !is Incomplete
362+
363+ // this is for `select` operator. `isSelected` state means "not new" (== was started or already completed)
364+ public val isSelected: Boolean get() {
365+ val state = this .state
366+ return state !is Incomplete || state.isActive
367+ }
356368
357- final override fun start (): Boolean {
369+ public final override fun start (): Boolean {
358370 while (true ) { // lock-free loop on state
359371 when (startInternal(state)) {
360372 0 -> return false
@@ -375,7 +387,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
375387 // LIST -- a list of completion handlers (either new or active)
376388 state is NodeList -> {
377389 if (state.isActive) return 0
378- if (! NodeList .ACTIVE .compareAndSet(state, 0 , 1 )) return - 1
390+ if (! NodeList .ACTIVE .compareAndSet(state, null , NodeList . ACTIVE_STATE )) return - 1
379391 onStart()
380392 return 1
381393 }
@@ -384,13 +396,53 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
384396 }
385397 }
386398
399+ internal fun describeStart (failureMarker : Any ): AtomicDesc =
400+ object : AtomicDesc () {
401+ override fun prepare (op : AtomicOp ): Any? {
402+ while (true ) { // lock-free loop on state
403+ val state = this @JobSupport._state
404+ when {
405+ state == = op -> return null // already in progress
406+ state is OpDescriptor -> state.perform(this @JobSupport) // help
407+ state == = EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
408+ if (STATE .compareAndSet(this @JobSupport, state, op)) return null // success
409+ }
410+ state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
411+ if (state.isActive) return failureMarker
412+ if (NodeList .ACTIVE .compareAndSet(state, null , op)) return null // success
413+ }
414+ else -> return failureMarker // not a new state
415+ }
416+ }
417+ }
418+
419+ override fun complete (op : AtomicOp , failure : Any? ) {
420+ val success = failure == null
421+ val state = this @JobSupport._state
422+ when {
423+ state == = op -> {
424+ if (STATE .compareAndSet(this @JobSupport, op, if (success) EmptyActive else EmptyNew )) {
425+ if (success) onStart()
426+ }
427+ }
428+ state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
429+ if (state._active == = op) {
430+ if (NodeList .ACTIVE .compareAndSet(state, op, if (success) NodeList .ACTIVE_STATE else null )) {
431+ if (success) onStart()
432+ }
433+ }
434+ }
435+ }
436+ }
437+ }
438+
387439 /* *
388440 * Override to provide the actual [start] action.
389441 */
390442 protected open fun onStart () {}
391443
392444 final override fun getCompletionException (): Throwable {
393- val state = getState()
445+ val state = this .state
394446 return when (state) {
395447 is Incomplete -> throw IllegalStateException (" Job has not completed yet" )
396448 is CompletedExceptionally -> state.exception
@@ -414,14 +466,14 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
414466 // EMPTY_NEW state -- no completion handlers, new
415467 state == = EmptyNew -> {
416468 // try to promote it to list in new state
417- STATE .compareAndSet(this , state, NodeList (active = 0 ))
469+ STATE .compareAndSet(this , state, NodeList (active = false ))
418470 }
419471 // SINGLE/SINGLE+ state -- one completion handler
420472 state is JobNode <* > -> {
421473 // try to promote it to list (SINGLE+ state)
422- state.addFirstIfEmpty (NodeList (active = 1 ))
474+ state.addOneIfEmpty (NodeList (active = true ))
423475 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
424- val list = state.next() // either NodeList or somebody else won the race, updated state
476+ val list = state.next // either NodeList or somebody else won the race, updated state
425477 // just attempt converting it to list if state is still the same, then continue lock-free loop
426478 STATE .compareAndSet(this , state, list)
427479 }
@@ -498,25 +550,36 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
498550 ? : InvokeOnCompletion (this , handler)
499551
500552 // for nicer debugging
501- override fun toString (): String = " ${this ::class .java.simpleName} {${describeState (state)} }@${Integer .toHexString(System .identityHashCode(this ))} "
553+ override fun toString (): String = " ${this ::class .java.simpleName} {${stateToString (state)} }@${Integer .toHexString(System .identityHashCode(this ))} "
502554
503555 /* *
504- * Interface for incomplete [state][getState] of a job.
556+ * Interface for incomplete [state] of a job.
505557 */
506558 public interface Incomplete {
507559 val isActive: Boolean
508560 }
509561
510562 private class NodeList (
511- @Volatile
512- var active : Int
563+ active : Boolean
513564 ) : LockFreeLinkedListHead(), Incomplete {
514- override val isActive: Boolean get() = active != 0
565+ @Volatile
566+ var _active : Any? = if (active) ACTIVE_STATE else null
567+
568+ override val isActive: Boolean get() {
569+ while (true ) { // helper loop for atomic ops
570+ val active = this ._active
571+ if (active !is OpDescriptor ) return active != null
572+ active.perform(this )
573+ }
574+ }
515575
516576 companion object {
517577 @JvmStatic
518- val ACTIVE : AtomicIntegerFieldUpdater <NodeList > =
519- AtomicIntegerFieldUpdater .newUpdater(NodeList ::class .java, " active" )
578+ val ACTIVE : AtomicReferenceFieldUpdater <NodeList , Any ?> =
579+ AtomicReferenceFieldUpdater .newUpdater(NodeList ::class .java, Any ::class .java, " _active" )
580+
581+ @JvmStatic
582+ val ACTIVE_STATE = Symbol (" ACTIVE_STATE" )
520583 }
521584
522585 override fun toString (): String = buildString {
@@ -533,7 +596,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
533596 }
534597
535598 /* *
536- * Class for a [state][getState] of a job that had completed exceptionally, including cancellation.
599+ * Class for a [state] of a job that had completed exceptionally, including cancellation.
537600 *
538601 * @param cause the exceptional completion cause. If `cause` is null, then a [CancellationException]
539602 * if created on first get from [exception] property.
0 commit comments