77
88package kotlinx.coroutines.experimental
99
10+ import kotlinx.atomicfu.*
1011import kotlinx.coroutines.experimental.internal.*
1112import kotlinx.coroutines.experimental.intrinsics.*
1213import kotlinx.coroutines.experimental.selects.*
@@ -193,13 +194,30 @@ public fun launch(context: CoroutineContext, start: Boolean, block: suspend Coro
193194public suspend fun <T > withContext (
194195 context : CoroutineContext ,
195196 block : suspend CoroutineScope .() -> T
196- ): T =
197- // todo: optimize fast-path to work without allocation (when there is a already a coroutine implementing scope)
198- withContextImpl(context, start = CoroutineStart .DEFAULT ) {
199- currentScope {
200- block()
197+ ): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
198+ // compute new context
199+ val oldContext = uCont.context
200+ val newContext = oldContext + context
201+ // FAST PATH #1 -- new context is the same as the old one
202+ if (newContext == = oldContext) {
203+ val coroutine = ScopeCoroutine (newContext, uCont) // MODE_DIRECT
204+ return @sc coroutine.startUndispatchedOrReturn(coroutine, block)
205+ }
206+ // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
207+ // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
208+ if (newContext[ContinuationInterceptor ] == oldContext[ContinuationInterceptor ]) {
209+ val coroutine = UndispatchedCoroutine (newContext, uCont) // MODE_UNDISPATCHED
210+ // There are changes in the context, so this thread needs to be updated
211+ withCoroutineContext(newContext) {
212+ return @sc coroutine.startUndispatchedOrReturn(coroutine, block)
201213 }
202214 }
215+ // SLOW PATH -- use new dispatcher
216+ val coroutine = DispatchedCoroutine (newContext, uCont) // MODE_DISPATCHED
217+ coroutine.initParentJob()
218+ block.startCoroutineCancellable(coroutine, coroutine)
219+ coroutine.getResult()
220+ }
203221
204222/* *
205223 * @suppress **Deprecated**: start parameter is deprecated, no replacement.
@@ -210,48 +228,7 @@ public suspend fun <T> withContext(
210228 start : CoroutineStart = CoroutineStart .DEFAULT ,
211229 block : suspend CoroutineScope .() -> T
212230): T =
213- // todo: optimize fast-path to work without allocation (when there is a already a coroutine implementing scope)
214- withContextImpl(context, start) {
215- currentScope {
216- block()
217- }
218- }
219-
220- // todo: optimize it to reduce allocations
221- private suspend fun <T > withContextImpl (
222- context : CoroutineContext ,
223- start : CoroutineStart = CoroutineStart .DEFAULT ,
224- block : suspend () -> T
225- ): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
226- val oldContext = uCont.context
227- // fast path #1 if there is no change in the actual context:
228- if (context == = oldContext || context is CoroutineContext .Element && oldContext[context.key] == = context)
229- return @sc block.startCoroutineUninterceptedOrReturn(uCont)
230- // compute new context
231- val newContext = oldContext + context
232- // fast path #2 if the result is actually the same
233- if (newContext == = oldContext)
234- return @sc block.startCoroutineUninterceptedOrReturn(uCont)
235- // fast path #3 if the new dispatcher is the same as the old one.
236- // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
237- if (newContext[ContinuationInterceptor ] == oldContext[ContinuationInterceptor ]) {
238- val newContinuation = RunContinuationUnintercepted (newContext, uCont)
239- // There are some other changes in the context, so this thread needs to be updated
240- withCoroutineContext(newContext) {
241- return @sc block.startCoroutineUninterceptedOrReturn(newContinuation)
242- }
243- }
244- // slowest path otherwise -- use new interceptor, sync to its result via a full-blown instance of RunCompletion
245- require(! start.isLazy) { " $start start is not supported" }
246- val completion = RunCompletion (
247- context = newContext,
248- delegate = uCont.intercepted(), // delegate to continuation intercepted with old dispatcher on completion
249- resumeMode = if (start == CoroutineStart .ATOMIC ) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE
250- )
251- completion.initParentJobInternal(newContext[Job ]) // attach to job
252- start(block, completion)
253- completion.getResult()
254- }
231+ withContext(context, block)
255232
256233/* * @suppress **Deprecated**: Binary compatibility */
257234@Deprecated(level = DeprecationLevel .HIDDEN , message = " Binary compatibility" )
@@ -261,7 +238,7 @@ public suspend fun <T> withContext0(
261238 start : CoroutineStart = CoroutineStart .DEFAULT ,
262239 block : suspend () -> T
263240): T =
264- withContextImpl (context, start, block)
241+ withContext (context) { block() }
265242
266243/* * @suppress **Deprecated**: Renamed to [withContext]. */
267244@Deprecated(message = " Renamed to `withContext`" , level= DeprecationLevel .WARNING ,
@@ -271,12 +248,12 @@ public suspend fun <T> run(
271248 start : CoroutineStart = CoroutineStart .DEFAULT ,
272249 block : suspend () -> T
273250): T =
274- withContextImpl (context, start, block)
251+ withContext (context) { block() }
275252
276253/* * @suppress **Deprecated** */
277254@Deprecated(message = " It is here for binary compatibility only" , level= DeprecationLevel .HIDDEN )
278255public suspend fun <T > run (context : CoroutineContext , block : suspend () -> T ): T =
279- withContextImpl (context, start = CoroutineStart . ATOMIC , block = block)
256+ withContext (context) { block() }
280257
281258// --------------- implementation ---------------
282259
@@ -297,29 +274,61 @@ private class LazyStandaloneCoroutine(
297274 }
298275}
299276
300- private class RunContinuationUnintercepted <in T >(
301- override val context : CoroutineContext ,
302- private val continuation : Continuation <T >
303- ): Continuation<T> {
304- override fun resume (value : T ) {
305- withCoroutineContext(continuation.context) {
306- continuation.resume(value)
277+ // Used by withContext when context changes, but dispatcher stays the same
278+ private class UndispatchedCoroutine <in T >(
279+ context : CoroutineContext ,
280+ uCont : Continuation <T >
281+ ) : ScopeCoroutine<T>(context, uCont) {
282+ override val defaultResumeMode: Int get() = MODE_UNDISPATCHED
283+ }
284+
285+ private const val UNDECIDED = 0
286+ private const val SUSPENDED = 1
287+ private const val RESUMED = 2
288+
289+ // Used by withContext when context dispatcher changes
290+ private class DispatchedCoroutine <in T >(
291+ context : CoroutineContext ,
292+ uCont : Continuation <T >
293+ ) : ScopeCoroutine<T>(context, uCont) {
294+ override val defaultResumeMode: Int get() = MODE_CANCELLABLE
295+
296+ // this is copy-and-paste of a decision state machine inside AbstractionContinuation
297+ // todo: we may some-how abstract it via inline class
298+ private val _decision = atomic(UNDECIDED )
299+
300+ private fun trySuspend (): Boolean {
301+ _decision .loop { decision ->
302+ when (decision) {
303+ UNDECIDED -> if (this ._decision .compareAndSet(UNDECIDED , SUSPENDED )) return true
304+ RESUMED -> return false
305+ else -> error(" Already suspended" )
306+ }
307307 }
308308 }
309309
310- override fun resumeWithException (exception : Throwable ) {
311- withCoroutineContext(continuation.context) {
312- continuation.resumeWithException(exception)
310+ private fun tryResume (): Boolean {
311+ _decision .loop { decision ->
312+ when (decision) {
313+ UNDECIDED -> if (this ._decision .compareAndSet(UNDECIDED , RESUMED )) return true
314+ SUSPENDED -> return false
315+ else -> error(" Already resumed" )
316+ }
313317 }
314318 }
315- }
316319
317- @Suppress(" UNCHECKED_CAST" )
318- private class RunCompletion <in T >(
319- override val context : CoroutineContext ,
320- delegate : Continuation <T >,
321- resumeMode : Int
322- ) : AbstractContinuation<T>(delegate, resumeMode) {
320+ override fun onCompletionInternal (state : Any? , mode : Int , suppressed : Boolean ) {
321+ if (tryResume()) return // completed before getResult invocation -- bail out
322+ // otherwise, getResult has already commenced, i.e. completed later or in other thread
323+ super .onCompletionInternal(state, mode, suppressed)
324+ }
323325
324- override val useCancellingState: Boolean get() = true
326+ fun getResult (): Any? {
327+ if (trySuspend()) return COROUTINE_SUSPENDED
328+ // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
329+ val state = this .state
330+ if (state is CompletedExceptionally ) throw state.cause
331+ @Suppress(" UNCHECKED_CAST" )
332+ return state as T
333+ }
325334}
0 commit comments