@@ -19,7 +19,7 @@ package kotlinx.coroutines.experimental
1919import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
2020import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
2121import kotlin.coroutines.experimental.Continuation
22- import kotlin.coroutines.experimental.ContinuationInterceptor
22+ import kotlin.coroutines.experimental.CoroutineContext
2323import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
2424import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2525import kotlin.coroutines.experimental.suspendCoroutine
@@ -57,18 +57,27 @@ public interface CancellableContinuation<in T> : Continuation<T>, Job {
5757 * Tries to resume this continuation with a given value and returns non-null object token if it was successful,
5858 * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
5959 * [completeResume] must be invoked with it.
60+ *
61+ * When [idempotent] is not `null`, this function performs _idempotent_ operation, so that
62+ * further invocations with the same non-null reference produce the same result.
63+ *
64+ * @suppress **This is unstable API and it is subject to change.**
6065 */
61- public fun tryResume (value : T ): Any?
66+ public fun tryResume (value : T , idempotent : Any? = null ): Any?
6267
6368 /* *
6469 * Tries to resume this continuation with a given exception and returns non-null object token if it was successful,
6570 * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
6671 * [completeResume] must be invoked with it.
72+ *
73+ * @suppress **This is unstable API and it is subject to change.**
6774 */
6875 public fun tryResumeWithException (exception : Throwable ): Any?
6976
7077 /* *
7178 * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
79+ *
80+ * @suppress **This is unstable API and it is subject to change.**
7281 */
7382 public fun completeResume (token : Any )
7483
@@ -113,7 +122,7 @@ public inline suspend fun <T> suspendCancellableCoroutine(
113122 crossinline block : (CancellableContinuation <T >) -> Unit
114123): T =
115124 suspendCoroutineOrReturn { cont ->
116- val cancellable = CancellableContinuationImpl (cont, getParentJobOrAbort(cont), active = true )
125+ val cancellable = CancellableContinuationImpl (cont, active = true )
117126 if (! holdCancellability) cancellable.initCancellability()
118127 block(cancellable)
119128 cancellable.getResult()
@@ -140,58 +149,67 @@ private class RemoveOnCancel(
140149 override fun toString () = " RemoveOnCancel[$node ]"
141150}
142151
143- @PublishedApi
144- internal fun getParentJobOrAbort (cont : Continuation <* >): Job ? {
145- val job = cont.context[Job ]
146- // fast path when parent job is already complete (we don't even construct CancellableContinuationImpl object)
147- if (job != null && ! job.isActive) throw job.getCompletionException()
148- return job
149- }
150-
151152@PublishedApi
152153internal open class CancellableContinuationImpl <in T >(
153- private val delegate : Continuation < T >,
154- private val parentJob : Job ? ,
154+ @JvmField
155+ protected val delegate : Continuation < T > ,
155156 active : Boolean
156- ) : AbstractCoroutine<T>(delegate.context, active), CancellableContinuation<T> {
157+ ) : AbstractCoroutine<T>(active), CancellableContinuation<T> {
157158 @Volatile
158159 private var decision = UNDECIDED
159160
160- private companion object {
161+ override val parentContext: CoroutineContext
162+ get() = delegate.context
163+
164+ protected companion object {
165+ @JvmStatic
161166 val DECISION : AtomicIntegerFieldUpdater <CancellableContinuationImpl <* >> =
162167 AtomicIntegerFieldUpdater .newUpdater(CancellableContinuationImpl ::class .java, " decision" )
163168
164169 const val UNDECIDED = 0
165170 const val SUSPENDED = 1
166171 const val RESUMED = 2
167- const val YIELD = 3 // used by cancellable "yield"
168- const val UNDISPATCHED = 4 // used by "undispatchedXXX"
172+
173+ const val MODE_UNDISPATCHED = 1
174+ const val MODE_DIRECT = 2
175+
176+ @Suppress(" UNCHECKED_CAST" )
177+ fun <T > getSuccessfulResult (state : Any? ): T = if (state is CompletedIdempotentResult ) state.result as T else state as T
169178 }
170179
171180 override fun initCancellability () {
172- initParentJob(parentJob )
181+ initParentJob(delegate.context[ Job ] )
173182 }
174183
175184 @PublishedApi
176185 internal fun getResult (): Any? {
177186 val decision = this .decision // volatile read
178- when (decision) {
179- UNDECIDED -> if (DECISION .compareAndSet(this , UNDECIDED , SUSPENDED )) return COROUTINE_SUSPENDED
180- YIELD -> return COROUTINE_SUSPENDED
181- }
187+ if (decision == UNDECIDED && DECISION .compareAndSet(this , UNDECIDED , SUSPENDED )) return COROUTINE_SUSPENDED
182188 // otherwise, afterCompletion was already invoked, and the result is in the state
183189 val state = this .state
184190 if (state is CompletedExceptionally ) throw state.exception
185- return state
191+ return getSuccessfulResult( state)
186192 }
187193
188194 override val isCancelled: Boolean get() = state is Cancelled
189195
190- override fun tryResume (value : T ): Any? {
196+ override fun tryResume (value : T , idempotent : Any? ): Any? {
191197 while (true ) { // lock-free loop on state
192198 val state = this .state // atomic read
193199 when (state) {
194- is Incomplete -> if (tryUpdateState(state, value)) return state
200+ is Incomplete -> {
201+ val idempotentStart = state.idempotentStart
202+ val update: Any? = if (idempotent == null && idempotentStart == null ) value else
203+ CompletedIdempotentResult (idempotentStart, idempotent, value, state)
204+ if (tryUpdateState(state, update)) return state
205+ }
206+ is CompletedIdempotentResult -> {
207+ if (state.idempotentResume == = idempotent) {
208+ check(state.result == = value) { " Non-idempotent resume" }
209+ return state.token
210+ } else
211+ return null
212+ }
195213 else -> return null // cannot resume -- not active anymore
196214 }
197215 }
@@ -201,56 +219,69 @@ internal open class CancellableContinuationImpl<in T>(
201219 while (true ) { // lock-free loop on state
202220 val state = this .state // atomic read
203221 when (state) {
204- is Incomplete -> if (tryUpdateState(state, CompletedExceptionally (exception))) return state
222+ is Incomplete -> {
223+ if (tryUpdateState(state, CompletedExceptionally (state.idempotentStart, exception))) return state
224+ }
205225 else -> return null // cannot resume -- not active anymore
206226 }
207227 }
208228 }
209229
210230 override fun completeResume (token : Any ) {
211- completeUpdateState(token, state)
231+ completeUpdateState(token, state, mode = 0 )
212232 }
213233
214- @Suppress(" UNCHECKED_CAST" )
215- override fun afterCompletion (state : Any? ) {
234+ override fun afterCompletion (state : Any? , mode : Int ) {
216235 val decision = this .decision // volatile read
217236 if (decision == UNDECIDED && DECISION .compareAndSet(this , UNDECIDED , RESUMED )) return // will get result in getResult
218237 // otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
219- when {
220- decision == UNDISPATCHED -> undispatchedCompletion(state)
221- state is CompletedExceptionally -> delegate.resumeWithException(state.exception)
222- decision == YIELD && delegate is DispatchedContinuation -> delegate.resumeYield(parentJob, state as T )
223- else -> delegate.resume(state as T )
238+ if (state is CompletedExceptionally ) {
239+ val exception = state.exception
240+ when (mode) {
241+ 0 -> delegate.resumeWithException(exception)
242+ MODE_UNDISPATCHED -> (delegate as DispatchedContinuation ).resumeUndispatchedWithException(exception)
243+ MODE_DIRECT -> {
244+ if (delegate is DispatchedContinuation )
245+ delegate.continuation.resumeWithException(exception)
246+ else
247+ delegate.resumeWithException(exception)
248+ }
249+ else -> error(" Invalid mode $mode " )
250+ }
251+ } else {
252+ val value = getSuccessfulResult<T >(state)
253+ when (mode) {
254+ 0 -> delegate.resume(value)
255+ MODE_UNDISPATCHED -> (delegate as DispatchedContinuation ).resumeUndispatched(value)
256+ MODE_DIRECT -> {
257+ if (delegate is DispatchedContinuation )
258+ delegate.continuation.resume(value)
259+ else
260+ delegate.resume(value)
261+ }
262+ else -> error(" Invalid mode $mode " )
263+ }
224264 }
225265 }
226266
227- @Suppress(" UNCHECKED_CAST" )
228- private fun undispatchedCompletion (state : Any? ) {
229- delegate as DispatchedContinuation // type assertion -- was checked in resumeUndispatched
230- if (state is CompletedExceptionally )
231- delegate.resumeUndispatchedWithException(state.exception)
232- else
233- delegate.resumeUndispatched(state as T )
234- }
235-
236- // can only be invoked in the same thread as getResult (see "yield"), afterCompletion may be concurrent
237- fun resumeYield (value : T ) {
238- if ((context[ContinuationInterceptor ] as ? CoroutineDispatcher )?.isDispatchNeeded(context) == true )
239- DECISION .compareAndSet(this , UNDECIDED , YIELD ) // try mark as needing dispatch
240- resume(value)
241- }
242-
243267 override fun CoroutineDispatcher.resumeUndispatched (value : T ) {
244268 val dc = delegate as ? DispatchedContinuation ? : throw IllegalArgumentException (" Must be used with DispatchedContinuation" )
245269 check(dc.dispatcher == = this ) { " Must be invoked from the context CoroutineDispatcher" }
246- DECISION .compareAndSet(this @CancellableContinuationImpl, SUSPENDED , UNDISPATCHED )
247- resume(value)
270+ resume(value, MODE_UNDISPATCHED )
248271 }
249272
250273 override fun CoroutineDispatcher.resumeUndispatchedWithException (exception : Throwable ) {
251274 val dc = delegate as ? DispatchedContinuation ? : throw IllegalArgumentException (" Must be used with DispatchedContinuation" )
252275 check(dc.dispatcher == = this ) { " Must be invoked from the context CoroutineDispatcher" }
253- DECISION .compareAndSet(this @CancellableContinuationImpl, SUSPENDED , UNDISPATCHED )
254- resumeWithException(exception)
276+ resumeWithException(exception, MODE_UNDISPATCHED )
277+ }
278+
279+ private class CompletedIdempotentResult (
280+ idempotentStart : Any? ,
281+ @JvmField val idempotentResume : Any? ,
282+ @JvmField val result : Any? ,
283+ @JvmField val token : Incomplete
284+ ) : CompletedIdempotentStart(idempotentStart) {
285+ override fun toString (): String = " CompletedIdempotentResult[$result ]"
255286 }
256287}
0 commit comments