1+ /*
2+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+ */
4+
5+ package kotlinx.coroutines.rx2
6+
7+ import io.reactivex.Observable
8+ import io.reactivex.ObservableSource
9+ import io.reactivex.Observer
10+ import io.reactivex.disposables.Disposables
11+ import io.reactivex.subjects.PublishSubject
12+ import kotlinx.coroutines.*
13+ import kotlinx.coroutines.channels.*
14+ import kotlinx.coroutines.flow.*
15+ import kotlin.test.*
16+
17+ class ObservableAsFlowTest : TestBase () {
18+ @Test
19+ fun testCancellation () = runTest {
20+ var onNext = 0
21+ var onCancelled = 0
22+ var onError = 0
23+
24+ val source = rxObservable(currentDispatcher()) {
25+ coroutineContext[Job ]?.invokeOnCompletion {
26+ if (it is CancellationException ) ++ onCancelled
27+ }
28+
29+ repeat(100 ) {
30+ send(it)
31+ }
32+ }
33+
34+ source.asFlow().launchIn(CoroutineScope (Dispatchers .Unconfined )) {
35+ onEach {
36+ ++ onNext
37+ throw RuntimeException ()
38+ }
39+ catch <Throwable > {
40+ ++ onError
41+ }
42+ }.join()
43+
44+
45+ assertEquals(1 , onNext)
46+ assertEquals(1 , onError)
47+ assertEquals(1 , onCancelled)
48+ }
49+
50+ @Test
51+ fun testImmediateCollection () {
52+ val source = PublishSubject .create<Int >()
53+ val flow = source.asFlow()
54+ GlobalScope .launch(Dispatchers .Unconfined ) {
55+ expect(1 )
56+ flow.collect { expect(it) }
57+ expect(6 )
58+ }
59+ expect(2 )
60+ source.onNext(3 )
61+ expect(4 )
62+ source.onNext(5 )
63+ source.onComplete()
64+ finish(7 )
65+ }
66+
67+ @Test
68+ fun testOnErrorCancellation () {
69+ val source = PublishSubject .create<Int >()
70+ val flow = source.asFlow()
71+ val exception = RuntimeException ()
72+ GlobalScope .launch(Dispatchers .Unconfined ) {
73+ try {
74+ expect(1 )
75+ flow.collect { expect(it) }
76+ expectUnreached()
77+ }
78+ catch (e: Exception ) {
79+ assertSame(exception, e.cause)
80+ expect(5 )
81+ }
82+ expect(6 )
83+ }
84+ expect(2 )
85+ source.onNext(3 )
86+ expect(4 )
87+ source.onError(exception)
88+ finish(7 )
89+ }
90+
91+ @Test
92+ fun testUnsubscribeOnCollectionException () {
93+ val source = PublishSubject .create<Int >()
94+ val flow = source.asFlow()
95+ val exception = RuntimeException ()
96+ GlobalScope .launch(Dispatchers .Unconfined ) {
97+ try {
98+ expect(1 )
99+ flow.collect {
100+ expect(it)
101+ if (it == 3 ) throw exception
102+ }
103+ expectUnreached()
104+ }
105+ catch (e: Exception ) {
106+ assertSame(exception, e.cause)
107+ expect(4 )
108+ }
109+ expect(5 )
110+ }
111+ expect(2 )
112+ assertTrue(source.hasObservers())
113+ source.onNext(3 )
114+ assertFalse(source.hasObservers())
115+ finish(6 )
116+ }
117+
118+ @Test
119+ fun testLateOnSubscribe () {
120+ var observer: Observer <in Int >? = null
121+ val source = ObservableSource <Int > { observer = it }
122+ val flow = source.asFlow()
123+ assertNull(observer)
124+ val job = GlobalScope .launch(Dispatchers .Unconfined ) {
125+ expect(1 )
126+ flow.collect { expectUnreached() }
127+ expectUnreached()
128+ }
129+ expect(2 )
130+ assertNotNull(observer)
131+ job.cancel()
132+ val disposable = Disposables .empty()
133+ observer!! .onSubscribe(disposable)
134+ assertTrue(disposable.isDisposed)
135+ finish(3 )
136+ }
137+
138+ @Test
139+ fun testBufferUnlimited () = runTest {
140+ val source = rxObservable(currentDispatcher()) {
141+ expect(1 ); send(10 )
142+ expect(2 ); send(11 )
143+ expect(3 ); send(12 )
144+ expect(4 ); send(13 )
145+ expect(5 ); send(14 )
146+ expect(6 ); send(15 )
147+ expect(7 ); send(16 )
148+ expect(8 ); send(17 )
149+ expect(9 )
150+ }
151+ source.asFlow().buffer(Channel .UNLIMITED ).collect { expect(it) }
152+ finish(18 )
153+ }
154+
155+ @Test
156+ fun testConflated () = runTest {
157+ val source = Observable .range(1 , 5 )
158+ val list = source.asFlow().conflate().toList()
159+ assertEquals(listOf (1 , 5 ), list)
160+ }
161+
162+ @Test
163+ fun testLongRange () = runTest {
164+ val source = Observable .range(1 , 10_000 )
165+ val count = source.asFlow().count()
166+ assertEquals(10_000 , count)
167+ }
168+
169+ @Test
170+ fun testProduce () = runTest {
171+ val source = Observable .range(0 , 10 )
172+ val flow = source.asFlow()
173+ check((0 .. 9 ).toList(), flow.produceIn(this ))
174+ check((0 .. 9 ).toList(), flow.buffer(Channel .UNLIMITED ).produceIn(this ))
175+ check((0 .. 9 ).toList(), flow.buffer(2 ).produceIn(this ))
176+ check((0 .. 9 ).toList(), flow.buffer(0 ).produceIn(this ))
177+ check(listOf (0 , 9 ), flow.conflate().produceIn(this ))
178+ }
179+
180+ private suspend fun check (expected : List <Int >, channel : ReceiveChannel <Int >) {
181+ val result = ArrayList <Int >(10 )
182+ channel.consumeEach { result.add(it) }
183+ assertEquals(expected, result)
184+ }
185+ }
0 commit comments