File tree Expand file tree Collapse file tree 2 files changed +22
-0
lines changed
reactive/kotlinx-coroutines-rx1/src
main/kotlin/kotlinx/coroutines/experimental/rx1
test/kotlin/kotlinx/coroutines/experimental/rx1 Expand file tree Collapse file tree 2 files changed +22
-0
lines changed Original file line number Diff line number Diff line change @@ -51,6 +51,16 @@ public suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont
5151 */
5252public suspend fun <T > Observable<T>.awaitFirst (): T = first().awaitOne()
5353
54+ /* *
55+ * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
56+ * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
57+ *
58+ * This suspending function is cancellable.
59+ * If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
60+ * immediately resumes with [CancellationException].
61+ */
62+ public suspend fun <T > Observable<T>.awaitFirstOrDefault (default : T ): T = firstOrDefault(default).awaitOne()
63+
5464/* *
5565 * Awaits for the last value from the given observable without blocking a thread and
5666 * returns the resulting value or throws the corresponding exception if this observable had produced error.
Original file line number Diff line number Diff line change @@ -90,6 +90,18 @@ class ObservableSingleTest {
9090
9191 @Test
9292 fun testAwaitFirst () {
93+ val observable = rxObservable(CommonPool ) {
94+ send(Observable .empty<String >().awaitFirstOrDefault(" O" ) + " K" )
95+ }
96+
97+ checkSingleValue(observable) {
98+ assertEquals(" OK" , it)
99+ }
100+ }
101+
102+
103+ @Test
104+ fun testAwaitFirstOrDefault () {
93105 val observable = rxObservable(CommonPool ) {
94106 send(Observable .just(" O" , " #" ).awaitFirst() + " K" )
95107 }
You can’t perform that action at this time.
0 commit comments