File tree Expand file tree Collapse file tree 2 files changed +43
-1
lines changed
reactive/kotlinx-coroutines-reactive Expand file tree Collapse file tree 2 files changed +43
-1
lines changed Original file line number Diff line number Diff line change @@ -199,7 +199,9 @@ private class PublisherCoroutine<in T>(
199199 }
200200 }
201201
202- override fun onCancellation (cause : Throwable ? ) {
202+ override fun onCompletedExceptionally (exception : Throwable ) = onCompleted(Unit )
203+
204+ override fun onCompleted (value : Unit ) {
203205 while (true ) { // lock-free loop for nRequested
204206 val cur = _nRequested .value
205207 if (cur == SIGNALLED ) return // some other thread holding lock already signalled cancellation/completion
Original file line number Diff line number Diff line change @@ -90,4 +90,44 @@ class PublishTest : TestBase() {
9090 throw RuntimeException (" OK" )
9191 }.openSubscription()
9292 }
93+
94+ @Test
95+ fun testHandleFailureAfterCancel () = runTest(
96+ unhandled = listOf ({ it -> it is RuntimeException && it.message == " FAILED" })
97+ ){
98+ expect(1 )
99+ // Exception should be delivered to CoroutineExceptionHandler, because we create publisher
100+ // with the NonCancellable parent
101+ val publisher = publish<Unit >(NonCancellable + Dispatchers .Unconfined ) {
102+ try {
103+ expect(3 )
104+ delay(10000 )
105+ } finally {
106+ expect(5 )
107+ throw RuntimeException (" FAILED" ) // crash after cancel
108+ }
109+ }
110+ var sub: Subscription ? = null
111+ publisher.subscribe(object : Subscriber <Unit > {
112+ override fun onComplete () {
113+ expectUnreached()
114+ }
115+
116+ override fun onSubscribe (s : Subscription ) {
117+ expect(2 )
118+ sub = s
119+ }
120+
121+ override fun onNext (t : Unit? ) {
122+ expectUnreached()
123+ }
124+
125+ override fun onError (t : Throwable ? ) {
126+ expectUnreached()
127+ }
128+ })
129+ expect(4 )
130+ sub!! .cancel()
131+ finish(6 )
132+ }
93133}
You can’t perform that action at this time.
0 commit comments