File tree Expand file tree Collapse file tree 2 files changed +44
-3
lines changed
kotlinx-coroutines-core/src
main/kotlin/kotlinx/coroutines/experimental/channels
test/kotlin/kotlinx/coroutines/experimental/channels Expand file tree Collapse file tree 2 files changed +44
-3
lines changed Original file line number Diff line number Diff line change @@ -338,13 +338,20 @@ class ArrayBroadcastChannel<E>(
338338 val subHead = this .subHead // guarded read (can be non-volatile read)
339339 // note: from the broadcastChannel we must read closed token first, then read its tail
340340 // because it is Ok if tail moves in between the reads (we make decision based on tail first)
341- val closed = broadcastChannel.closedForReceive // unguarded volatile read
341+ val closedBroadcast = broadcastChannel.closedForReceive // unguarded volatile read
342342 val tail = broadcastChannel.tail // unguarded volatile read
343343 if (subHead >= tail) {
344344 // no elements to poll from the queue -- check if closed
345- return closed ? : POLL_FAILED // must retest `needsToCheckOfferWithoutLock` outside of the lock
345+ return closedBroadcast ? : POLL_FAILED // must retest `needsToCheckOfferWithoutLock` outside of the lock
346346 }
347- return broadcastChannel.elementAt(subHead)
347+ // Get tentative result. This result may be wrong (completely invalid value, including null),
348+ // because this subscription might get closed, moving channel's head past this subscription's head.
349+ val result = broadcastChannel.elementAt(subHead)
350+ // now check if this subscription was closed
351+ val closedSub = this .closedForReceive
352+ if (closedSub != null ) return closedSub
353+ // we know the subscription was not closed, so this tentative result is Ok to return
354+ return result
348355 }
349356 }
350357}
Original file line number Diff line number Diff line change @@ -132,4 +132,38 @@ class ArrayBroadcastChannelTest : TestBase() {
132132 yield ()
133133 finish(6 )
134134 }
135+
136+ @Test
137+ fun testReceiveFullAfterClose () = runBlocking<Unit > {
138+ val channel = BroadcastChannel <Int >(10 )
139+ val sub = channel.openSubscription()
140+ // generate into buffer & close
141+ for (x in 1 .. 5 ) channel.send(x)
142+ channel.close()
143+ // make sure all of them are consumed
144+ check(! sub.isClosedForReceive)
145+ for (x in 1 .. 5 ) check(sub.receive() == x)
146+ check(sub.receiveOrNull() == null )
147+ check(sub.isClosedForReceive)
148+ }
149+
150+ @Test
151+ fun testCloseSubDuringIteration () = runBlocking<Unit > {
152+ val channel = BroadcastChannel <Int >(1 )
153+ // launch generator (for later) in this context
154+ launch(coroutineContext) {
155+ for (x in 1 .. 5 ) channel.send(x)
156+ channel.close()
157+ }
158+ // start consuming
159+ val sub = channel.openSubscription()
160+ var expected = 0
161+ sub.consumeEach {
162+ check(it == ++ expected)
163+ if (it == 2 ) {
164+ sub.close()
165+ }
166+ }
167+ check(expected == 2 )
168+ }
135169}
You can’t perform that action at this time.
0 commit comments