@@ -245,14 +245,15 @@ import kotlinx.coroutines.experimental.reactive.*
245245fun main (args : Array <String >) = runBlocking<Unit > {
246246 val source = Flowable .range(1 , 5 ) // a range of five numbers
247247 .doOnSubscribe { println (" OnSubscribe" ) } // provide some insight
248+ .doOnComplete { println (" OnComplete" ) } // ...
248249 .doFinally { println (" Finally" ) } // ... into what's going on
249250 var cnt = 0
250251 source.openSubscription().consume { // open channel to the source
251252 for (x in this ) { // iterate over the channel to receive elements from it
252253 println (x)
253254 if (++ cnt >= 3 ) break // break when 3 elements are printed
254255 }
255- // `use` will close the channel when this block of code is complete
256+ // Note: `consume` cancels the channel when this block of code is complete
256257 }
257258}
258259```
@@ -271,16 +272,16 @@ Finally
271272
272273<!-- - TEST -->
273274
274- With an explicit ` openSubscription ` we should [ close] [ SubscriptionReceiveChannel.close ] the corresponding
275- subscription to unsubscribe from the source. However, instead of invoking ` close ` explicitly,
276- this code relies on [ use] ( https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html )
277- function from Kotlin's standard library.
275+ With an explicit ` openSubscription ` we should [ cancel] [ SubscriptionReceiveChannel.cancel ] the corresponding
276+ subscription to unsubscribe from the source. There is no need to invoke ` cancel ` explicitly -- under the hood
277+ ` consume ` does that for us.
278278The installed
279279[ doFinally] ( http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action) )
280- listener prints "Finally" to confirm that the subscription is actually being closed.
281-
282- We do not need to use an explicit ` close ` if iteration is performed over all the items that are emitted
283- by the publisher, because it is being closed automatically by ` consumeEach ` :
280+ listener prints "Finally" to confirm that the subscription is actually being closed. Note that "OnComplete"
281+ is never printed because we did not consume all of the elements.
282+
283+ We do not need to use an explicit ` cancel ` either if iteration is performed over all the items that are emitted
284+ by the publisher, because it is being cancelled automatically by ` consumeEach ` :
284285
285286<!-- - INCLUDE
286287import io.reactivex.*
@@ -293,6 +294,7 @@ import kotlin.coroutines.experimental.*
293294fun main (args : Array <String >) = runBlocking<Unit > {
294295 val source = Flowable .range(1 , 5 ) // a range of five numbers
295296 .doOnSubscribe { println (" OnSubscribe" ) } // provide some insight
297+ .doOnComplete { println (" OnComplete" ) } // ...
296298 .doFinally { println (" Finally" ) } // ... into what's going on
297299 // iterate over the source fully
298300 source.consumeEach { println (it) }
@@ -309,13 +311,14 @@ OnSubscribe
3093112
3103123
3113134
314+ OnComplete
312315Finally
3133165
314317```
315318
316319<!-- - TEST -->
317320
318- Notice, how "Finally" is printed before the last element "5". It happens because our ` main ` function in this
321+ Notice, how "OnComplete" and " Finally" are printed before the last element "5". It happens because our ` main ` function in this
319322example is a coroutine that we start with [ runBlocking] coroutine builder.
320323Our main coroutine receives on the channel using ` source.consumeEach { ... } ` expression.
321324The main coroutine is _ suspended_ while it waits for the source to emit an item.
@@ -1075,7 +1078,6 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker
10751078[ produce ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
10761079[ consumeEach ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
10771080[ ReceiveChannel ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
1078- [ SubscriptionReceiveChannel.close ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-subscription-receive-channel/close.html
10791081[ SendChannel.send ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
10801082[ BroadcastChannel ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-broadcast-channel/index.html
10811083[ ConflatedBroadcastChannel ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-conflated-broadcast-channel/index.html
0 commit comments