File tree Expand file tree Collapse file tree 3 files changed +47
-0
lines changed
reactive/kotlinx-coroutines-rx1
main/kotlin/kotlinx/coroutines/experimental/rx1
test/kotlin/kotlinx/coroutines/experimental/rx1 Expand file tree Collapse file tree 3 files changed +47
-0
lines changed Original file line number Diff line number Diff line change @@ -14,6 +14,7 @@ Suspending extension functions and suspending iteration:
1414
1515| ** Name** | ** Description**
1616| -------- | ---------------
17+ | [ Completable.awaitCompleted] [ rx.Completable.awaitCompleted ] | Awaits for completion of the completable value
1718| [ Single.await] [ rx.Single.await ] | Awaits for completion of the single value and returns it
1819| [ Observable.awaitFirst] [ rx.Observable.awaitFirst ] | Returns the first value from the given observable
1920| [ Observable.awaitFirstOrDefault] [ rx.Observable.awaitFirstOrDefault ] | Returns the first value from the given observable or default
@@ -44,6 +45,7 @@ Conversion functions:
4445[ rxCompletable ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx-completable.html
4546[ rxSingle ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx-single.html
4647[ rxObservable ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx-observable.html
48+ [ rx.Completable.awaitCompleted ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-completable/await-completed.html
4749[ rx.Single.await ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-single/await.html
4850[ rx.Observable.awaitFirst ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first.html
4951[ rx.Observable.awaitFirstOrDefault ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first-or-default.html
Original file line number Diff line number Diff line change @@ -22,6 +22,23 @@ import kotlinx.coroutines.experimental.Job
2222import kotlinx.coroutines.experimental.suspendCancellableCoroutine
2323import rx.*
2424
25+ // ------------------------ Completable ------------------------
26+
27+ /* *
28+ * Awaits for completion of this completable without blocking a thread.
29+ * Returns `Unit` or throws the corresponding exception if this completable had produced error.
30+ *
31+ * This suspending function is cancellable. If the [Job] of the invoking coroutine is completed while this
32+ * suspending function is suspended, this function immediately resumes with [CancellationException].
33+ */
34+ public suspend fun Completable.awaitCompleted (): Unit = suspendCancellableCoroutine { cont ->
35+ subscribe(object : CompletableSubscriber {
36+ override fun onSubscribe (s : Subscription ) { cont.unsubscribeOnCompletion(s) }
37+ override fun onCompleted () { cont.resume(Unit ) }
38+ override fun onError (e : Throwable ) { cont.resumeWithException(e) }
39+ })
40+ }
41+
2542// ------------------------ Single ------------------------
2643
2744/* *
Original file line number Diff line number Diff line change @@ -83,4 +83,32 @@ class CompletableTest : TestBase() {
8383 yield ()
8484 finish(7 )
8585 }
86+
87+ @Test
88+ fun testAwaitSuccess () = runBlocking<Unit > {
89+ expect(1 )
90+ val completable = rxCompletable(context) {
91+ expect(3 )
92+ }
93+ expect(2 )
94+ completable.awaitCompleted() // shall launch coroutine
95+ finish(4 )
96+ }
97+
98+ @Test
99+ fun testAwaitFailure () = runBlocking<Unit > {
100+ expect(1 )
101+ val completable = rxCompletable(context) {
102+ expect(3 )
103+ throw RuntimeException (" OK" )
104+ }
105+ expect(2 )
106+ try {
107+ completable.awaitCompleted() // shall launch coroutine and throw exception
108+ expectUnreached()
109+ } catch (e: RuntimeException ) {
110+ finish(4 )
111+ assertThat(e.message, IsEqual (" OK" ))
112+ }
113+ }
86114}
You can’t perform that action at this time.
0 commit comments