@@ -321,15 +321,15 @@ _suspend_ and they provide a natural answer to handling backpressure.
321321In Rx Java 2.x a backpressure-capable class is called
322322[ Flowable] ( http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html ) .
323323In the following example we use [ rxFlowable] coroutine builder from ` kotlinx-coroutines-rx2 ` module to define a
324- flowable that sends five integers from 1 to 5 .
324+ flowable that sends three integers from 1 to 3 .
325325It prints a message to the output before invocation of
326326suspending [ send] [ SendChannel.send ] function, so that we can study how it operates.
327327
328328The integers are generated in the context of the main thread, but subscription is shifted
329329to another thread using Rx
330330[ observeOn] ( http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler,%20boolean,%20int) )
331331operator with a buffer of size 1.
332- The subscriber is slow. It takes 200 ms to process each item, which is simulated using ` Thread.sleep ` .
332+ The subscriber is slow. It takes 500 ms to process each item, which is simulated using ` Thread.sleep ` .
333333
334334<!-- - INCLUDE
335335import kotlinx.coroutines.experimental.*
@@ -341,20 +341,20 @@ import io.reactivex.schedulers.Schedulers
341341fun main (args : Array <String >) = runBlocking<Unit > {
342342 // coroutine -- fast producer of elements in the context of the main thread
343343 val source = rxFlowable(context) {
344- for (x in 1 .. 5 ) {
345- println (" Sending $x ..." )
344+ for (x in 1 .. 3 ) {
346345 send(x) // this is a suspending function
346+ println (" Sent $x " ) // print after successfully sent item
347347 }
348348 }
349349 // subscribe on another thread with a slow subscriber using Rx
350350 source
351351 .observeOn(Schedulers .io(), false , 1 ) // specify buffer size of 1 item
352352 .doOnComplete { println (" Complete" ) }
353353 .subscribe { x ->
354- println ( " Received $x " )
355- Thread .sleep( 300 ) // 300 ms to process each item
354+ Thread .sleep( 500 ) // 500ms to process each item
355+ println ( " Processed $x " )
356356 }
357- delay(2000 ) // suspend main thread for couple of seconds
357+ delay(2000 ) // suspend the main thread for a few seconds
358358}
359359```
360360
@@ -363,23 +363,19 @@ fun main(args: Array<String>) = runBlocking<Unit> {
363363The output of this code nicely illustrates how backpressure works with coroutines:
364364
365365``` text
366- Sending 1 ...
367- Sending 2 ...
368- Received 1
369- Sending 3 ...
370- Received 2
371- Sending 4 ...
372- Received 3
373- Sending 5 ...
374- Received 4
375- Received 5
366+ Sent 1
367+ Processed 1
368+ Sent 2
369+ Processed 2
370+ Sent 3
371+ Processed 3
376372Complete
377373```
378374
379375<!-- - TEST -->
380376
381377We see here how producer coroutine puts the first element in the buffer and is suspended while trying to send another
382- one. Only after consumer receives the first item, the sender resumes to produce more .
378+ one. Only after consumer processes the first item, producer sends the second one and resumes, etc .
383379
384380
385381### Rx Subject vs BroadcastChannel
0 commit comments