File tree Expand file tree Collapse file tree 2 files changed +49
-1
lines changed
reactive/kotlinx-coroutines-reactive Expand file tree Collapse file tree 2 files changed +49
-1
lines changed Original file line number Diff line number Diff line change @@ -132,7 +132,15 @@ private class PublisherCoroutine<in T>(
132132 throw getCancellationException()
133133 }
134134 // notify subscriber
135- subscriber.onNext(elem)
135+ try {
136+ subscriber.onNext(elem)
137+ } catch (e: Throwable ) {
138+ // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
139+ // to abort the corresponding send/offer invocation
140+ cancel(e)
141+ unlockAndCheckCompleted()
142+ throw e
143+ }
136144 // now update nRequested
137145 while (true ) { // lock-free loop on nRequested
138146 val cur = _nRequested .value
Original file line number Diff line number Diff line change @@ -168,5 +168,45 @@ class PublishTest : TestBase() {
168168 expectUnreached()
169169 }
170170
171+ @Test
172+ fun testOnNextError () = runTest {
173+ expect(1 )
174+ val publisher = publish<String >(NonCancellable ) {
175+ expect(4 )
176+ try {
177+ send(" OK" )
178+ } catch (e: Throwable ) {
179+ expect(6 )
180+ assert (e is TestException )
181+ }
182+ }
183+ expect(2 )
184+ val latch = CompletableDeferred <Unit >()
185+ publisher.subscribe(object : Subscriber <String > {
186+ override fun onComplete () {
187+ expectUnreached()
188+ }
189+
190+ override fun onSubscribe (s : Subscription ) {
191+ expect(3 )
192+ s.request(1 )
193+ }
194+
195+ override fun onNext (t : String ) {
196+ expect(5 )
197+ assertEquals(" OK" , t)
198+ throw TestException ()
199+ }
200+
201+ override fun onError (t : Throwable ) {
202+ expect(7 )
203+ assert (t is TestException )
204+ latch.complete(Unit )
205+ }
206+ })
207+ latch.await()
208+ finish(8 )
209+ }
210+
171211 private class TestException : Exception ()
172212}
You can’t perform that action at this time.
0 commit comments