22 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33 */
44
5- @file:JvmMultifileClass
6- @file:JvmName(" FlowKt" )
7-
85package kotlinx.coroutines.reactive
96
7+ import kotlinx.atomicfu.*
108import kotlinx.coroutines.*
119import kotlinx.coroutines.channels.*
1210import kotlinx.coroutines.flow.*
1311import kotlinx.coroutines.flow.internal.*
12+ import kotlinx.coroutines.intrinsics.*
1413import org.reactivestreams.*
1514import java.util.*
1615import kotlin.coroutines.*
@@ -28,13 +27,11 @@ import kotlin.coroutines.*
2827public fun <T : Any > Publisher<T>.asFlow (): Flow <T > =
2928 PublisherAsFlow (this , 1 )
3029
31- @FlowPreview
32- @Deprecated(
33- message = " batchSize parameter is deprecated, use .buffer() instead to control the backpressure" ,
34- level = DeprecationLevel .ERROR ,
35- replaceWith = ReplaceWith (" asFlow().buffer(batchSize)" , imports = [" kotlinx.coroutines.flow.*" ])
36- )
37- public fun <T : Any > Publisher<T>.asFlow (batchSize : Int ): Flow <T > = asFlow().buffer(batchSize)
30+ /* *
31+ * Transforms the given flow to a spec-compliant [Publisher].
32+ */
33+ @ExperimentalCoroutinesApi
34+ public fun <T : Any > Flow<T>.asPublisher (): Publisher <T > = FlowAsPublisher (this )
3835
3936private class PublisherAsFlow <T : Any >(
4037 private val publisher : Publisher <T >,
@@ -137,4 +134,94 @@ private val contextInjectors: List<ContextInjector> =
137134 ServiceLoader .load(ContextInjector ::class .java, ContextInjector ::class .java.classLoader).toList()
138135
139136private fun <T > Publisher<T>.injectCoroutineContext (coroutineContext : CoroutineContext ) =
140- contextInjectors.fold(this ) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
137+ contextInjectors.fold(this ) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
138+
139+
140+ /* *
141+ * Adapter that transforms [Flow] into TCK-complaint [Publisher].
142+ * [cancel] invocation cancels the original flow.
143+ */
144+ @Suppress(" PublisherImplementation" )
145+ private class FlowAsPublisher <T : Any >(private val flow : Flow <T >) : Publisher<T> {
146+ override fun subscribe (subscriber : Subscriber <in T >? ) {
147+ if (subscriber == null ) throw NullPointerException ()
148+ subscriber.onSubscribe(FlowSubscription (flow, subscriber))
149+ }
150+ }
151+
152+ /* * @suppress */
153+ @InternalCoroutinesApi
154+ public class FlowSubscription <T >(
155+ @JvmField val flow : Flow <T >,
156+ @JvmField val subscriber : Subscriber <in T >
157+ ) : Subscription, AbstractCoroutine<Unit>(Dispatchers .Unconfined , false ) {
158+ private val requested = atomic(0L )
159+ private val producer = atomic<CancellableContinuation <Unit >? > (null )
160+
161+ override fun onStart () {
162+ ::flowProcessing.startCoroutineCancellable(this )
163+ }
164+
165+ private suspend fun flowProcessing () {
166+ try {
167+ consumeFlow()
168+ subscriber.onComplete()
169+ } catch (e: Throwable ) {
170+ try {
171+ if (e is CancellationException ) {
172+ subscriber.onComplete()
173+ } else {
174+ subscriber.onError(e)
175+ }
176+ } catch (e: Throwable ) {
177+ // Last ditch report
178+ handleCoroutineException(coroutineContext, e)
179+ }
180+ }
181+ }
182+
183+ /*
184+ * This method has at most one caller at any time (triggered from the `request` method)
185+ */
186+ private suspend fun consumeFlow () {
187+ flow.collect { value ->
188+ /*
189+ * Flow is scopeless, thus if it's not active, its subscription was cancelled.
190+ * No intermediate "child failed, but flow coroutine is not" states are allowed.
191+ */
192+ coroutineContext.ensureActive()
193+ if (requested.value <= 0L ) {
194+ suspendCancellableCoroutine<Unit > {
195+ producer.value = it
196+ if (requested.value != 0L ) it.resumeSafely()
197+ }
198+ }
199+ requested.decrementAndGet()
200+ subscriber.onNext(value)
201+ }
202+ }
203+
204+ override fun cancel () {
205+ cancel(null )
206+ }
207+
208+ override fun request (n : Long ) {
209+ if (n <= 0 ) {
210+ return
211+ }
212+ start()
213+ requested.update { value ->
214+ val newValue = value + n
215+ if (newValue <= 0L ) Long .MAX_VALUE else newValue
216+ }
217+ val producer = producer.getAndSet(null ) ? : return
218+ producer.resumeSafely()
219+ }
220+
221+ private fun CancellableContinuation<Unit>.resumeSafely () {
222+ val token = tryResume(Unit )
223+ if (token != null ) {
224+ completeResume(token)
225+ }
226+ }
227+ }
0 commit comments