File tree Expand file tree Collapse file tree 5 files changed +87
-6
lines changed
kotlinx-coroutines-reactive/src
kotlinx-coroutines-reactor/test
kotlinx-coroutines-rx2/test Expand file tree Collapse file tree 5 files changed +87
-6
lines changed Original file line number Diff line number Diff line change @@ -292,19 +292,20 @@ OnSubscribe
2922921
2932932
2942943
295- 4
296295OnComplete
297296Finally
297+ 4
2982985
299299```
300300
301301<!-- - TEST -->
302302
303- Notice how "OnComplete" and "Finally" are printed before the last element "5". It happens because our ` main ` function in this
303+ Notice how "OnComplete" and "Finally" are printed before the lasts elements "4" and "5".
304+ It happens because our ` main ` function in this
304305example is a coroutine that we start with the [ runBlocking] coroutine builder.
305306Our main coroutine receives on the flowable using the ` source.collect { ... } ` expression.
306307The main coroutine is _ suspended_ while it waits for the source to emit an item.
307- When the last item is emitted by ` Flowable.range(1, 5) ` it
308+ When the last items are emitted by ` Flowable.range(1, 5) ` it
308309_ resumes_ the main coroutine, which gets dispatched onto the main thread to print this
309310 last element at a later point in time, while the source completes and prints "Finally".
310311
Original file line number Diff line number Diff line change @@ -17,11 +17,11 @@ import org.reactivestreams.*
1717 * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
1818 * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
1919 *
20- * @param request how many items to request from publisher in advance (optional, on-demand request by default).
20+ * @param request how many items to request from publisher in advance (optional, one by default).
2121 */
2222@ObsoleteCoroutinesApi
2323@Suppress(" CONFLICTING_OVERLOADS" )
24- public fun <T > Publisher<T>.openSubscription (request : Int = 0 ): ReceiveChannel <T > {
24+ public fun <T > Publisher<T>.openSubscription (request : Int = 1 ): ReceiveChannel <T > {
2525 val channel = SubscriptionChannel <T >(request)
2626 subscribe(channel)
2727 return channel
Original file line number Diff line number Diff line change 1+ /*
2+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+ */
4+
5+ package kotlinx.coroutines.reactor
6+
7+ import kotlinx.coroutines.*
8+ import kotlinx.coroutines.flow.*
9+ import kotlinx.coroutines.reactive.*
10+ import kotlinx.coroutines.reactive.flow.*
11+ import org.junit.Test
12+ import reactor.core.publisher.*
13+ import kotlin.test.*
14+
15+ class BackpressureTest : TestBase () {
16+ @Test
17+ fun testBackpressureDropDirect () = runTest {
18+ expect(1 )
19+ Flux .fromArray(arrayOf(1 ))
20+ .onBackpressureDrop()
21+ .collect {
22+ assertEquals(1 , it)
23+ expect(2 )
24+ }
25+ finish(3 )
26+ }
27+
28+ @Test
29+ fun testBackpressureDropFlow () = runTest {
30+ expect(1 )
31+ Flux .fromArray(arrayOf(1 ))
32+ .onBackpressureDrop()
33+ .asFlow()
34+ .collect {
35+ assertEquals(1 , it)
36+ expect(2 )
37+ }
38+ finish(3 )
39+ }
40+ }
Original file line number Diff line number Diff line change 1+ /*
2+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+ */
4+
5+ package kotlinx.coroutines.rx2
6+
7+ import io.reactivex.*
8+ import kotlinx.coroutines.*
9+ import kotlinx.coroutines.flow.*
10+ import kotlinx.coroutines.reactive.*
11+ import kotlinx.coroutines.reactive.flow.*
12+ import org.junit.Test
13+ import kotlin.test.*
14+
15+ class BackpressureTest : TestBase () {
16+ @Test
17+ fun testBackpressureDropDirect () = runTest {
18+ expect(1 )
19+ Flowable .fromArray(1 )
20+ .onBackpressureDrop()
21+ .collect {
22+ assertEquals(1 , it)
23+ expect(2 )
24+ }
25+ finish(3 )
26+ }
27+
28+ @Test
29+ fun testBackpressureDropFlow () = runTest {
30+ expect(1 )
31+ Flowable .fromArray(1 )
32+ .onBackpressureDrop()
33+ .asFlow()
34+ .collect {
35+ assertEquals(1 , it)
36+ expect(2 )
37+ }
38+ finish(3 )
39+ }
40+ }
Original file line number Diff line number Diff line change @@ -52,9 +52,9 @@ class GuideReactiveTest : ReactiveTestBase() {
5252 " 1" ,
5353 " 2" ,
5454 " 3" ,
55- " 4" ,
5655 " OnComplete" ,
5756 " Finally" ,
57+ " 4" ,
5858 " 5"
5959 )
6060 }
You can’t perform that action at this time.
0 commit comments