@@ -47,7 +47,7 @@ class ArrayBroadcastChannel<E>(
4747 }
4848
4949 private val bufferLock = ReentrantLock ()
50- private val buffer: Array < Any ?> = arrayOfNulls<Any ?>(capacity) // guarded by lock
50+ private val buffer = arrayOfNulls<Any ?>(capacity) // guarded by bufferLock
5151
5252 // head & tail are Long (64 bits) and we assume that they never wrap around
5353 // head, tail, and size are guarded by bufferLock
@@ -58,24 +58,24 @@ class ArrayBroadcastChannel<E>(
5858 @Volatile
5959 private var size: Int = 0
6060
61+ /*
62+ Writes to buffer are guarded by bufferLock, but reads from buffer are concurrent with writes
63+ - Write element to buffer then write "tail" (volatile)
64+ - Read "tail" (volatile), then read element from buffer
65+ So read/writes to buffer need not be volatile
66+ */
67+
6168 private val subs = CopyOnWriteArrayList <Subscriber <E >>()
6269
6370 override val isBufferAlwaysFull: Boolean get() = false
6471 override val isBufferFull: Boolean get() = size >= capacity
6572
6673 override fun openSubscription (): SubscriptionReceiveChannel <E > {
67- val sub = Subscriber (this , head)
68- subs.add(sub)
69- // between creating and adding of subscription into the list the buffer head could have been bumped past it,
70- // so here we check if it did happen and update the head in subscription in this case
71- // we did not leak newly created subscription yet, so its subHead cannot update
72- val head = this .head // volatile read after sub was added to subs
73- if (head != sub.subHead) {
74- // needs update
75- sub.subHead = head
76- updateHead() // and also must recompute head of the buffer
74+ bufferLock.withLock {
75+ val sub = Subscriber (this , head)
76+ subs.add(sub)
77+ return sub
7778 }
78- return sub
7979 }
8080
8181 override fun close (cause : Throwable ? ): Boolean {
@@ -122,12 +122,6 @@ class ArrayBroadcastChannel<E>(
122122 return OFFER_SUCCESS
123123 }
124124
125- private fun closeSubscriber (sub : Subscriber <E >) {
126- subs.remove(sub)
127- if (head == sub.subHead)
128- updateHead()
129- }
130-
131125 private fun checkSubOffers () {
132126 var updated = false
133127 var hasSubs = false
@@ -137,51 +131,54 @@ class ArrayBroadcastChannel<E>(
137131 if (sub.checkOffer()) updated = true
138132 }
139133 if (updated || ! hasSubs)
140- updateHead()
134+ updateHead(null )
141135 }
142136
143- private fun updateHead () {
144- // compute minHead w/o lock (it will be eventually consistent)
145- val minHead = computeMinHead()
146- // update head in a loop
147- while (true ) {
148- var send: Send ? = null
149- var token: Any? = null
150- bufferLock.withLock {
151- val tail = this .tail
152- var head = this .head
153- val targetHead = minHead.coerceAtMost(tail)
154- if (targetHead <= head) return // nothing to do -- head was already moved
155- var size = this .size
156- // clean up removed (on not need if we don't have any subscribers anymore)
157- while (head < targetHead) {
158- buffer[(head % capacity).toInt()] = null
159- val wasFull = size >= capacity
160- // update the size before checking queue (no more senders can queue up)
161- this .head = ++ head
162- this .size = -- size
163- if (wasFull) {
164- while (true ) {
165- send = takeFirstSendOrPeekClosed() ? : break // when when no sender
166- if (send is Closed <* >) break // break when closed for send
167- token = send!! .tryResumeSend(idempotent = null )
168- if (token != null ) {
169- // put sent element to the buffer
170- buffer[(tail % capacity).toInt()] = (send as Send ).pollResult
171- this .size = size + 1
172- this .tail = tail + 1
173- return @withLock // go out of lock to wakeup this sender
174- }
137+ private tailrec fun updateHead (removeSub : Subscriber <E >? ) {
138+ // update head in a tail rec loop
139+ var send: Send ? = null
140+ var token: Any? = null
141+ bufferLock.withLock {
142+ if (removeSub != null ) {
143+ subs.remove(removeSub)
144+ if (head != removeSub.subHead) return // no need to update
145+ }
146+ val minHead = computeMinHead()
147+ val tail = this .tail
148+ var head = this .head
149+ val targetHead = minHead.coerceAtMost(tail)
150+ if (targetHead <= head) return // nothing to do -- head was already moved
151+ var size = this .size
152+ // clean up removed (on not need if we don't have any subscribers anymore)
153+ while (head < targetHead) {
154+ buffer[(head % capacity).toInt()] = null
155+ val wasFull = size >= capacity
156+ // update the size before checking queue (no more senders can queue up)
157+ this .head = ++ head
158+ this .size = -- size
159+ if (wasFull) {
160+ while (true ) {
161+ send = takeFirstSendOrPeekClosed() ? : break // when when no sender
162+ if (send is Closed <* >) break // break when closed for send
163+ token = send!! .tryResumeSend(idempotent = null )
164+ if (token != null ) {
165+ // put sent element to the buffer
166+ buffer[(tail % capacity).toInt()] = (send as Send ).pollResult
167+ this .size = size + 1
168+ this .tail = tail + 1
169+ return @withLock // go out of lock to wakeup this sender
175170 }
176171 }
177172 }
178- return // done updating here -> return
179173 }
180- // we only get out of the lock normally when there is a sender to resume
181- send!! .completeResumeSend(token!! )
182- // since we've just sent an element, we might need to resume some receivers
183- checkSubOffers()
174+ return // done updating here -> return
184175 }
176+ // we only get out of the lock normally when there is a sender to resume
177+ send!! .completeResumeSend(token!! )
178+ // since we've just sent an element, we might need to resume some receivers
179+ checkSubOffers()
180+ // tailrec call to recheck
181+ updateHead(null )
185182 }
186183
187184 private fun computeMinHead (): Long {
@@ -196,9 +193,9 @@ class ArrayBroadcastChannel<E>(
196193
197194 private class Subscriber <E >(
198195 private val broadcastChannel : ArrayBroadcastChannel <E >,
199- @Volatile @JvmField var subHead : Long // guarded by lock
196+ @Volatile @JvmField var subHead : Long // guarded by subLock
200197 ) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
201- private val lock = ReentrantLock ()
198+ private val subLock = ReentrantLock ()
202199
203200 override val isBufferAlwaysEmpty: Boolean get() = false
204201 override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail
@@ -207,7 +204,7 @@ class ArrayBroadcastChannel<E>(
207204
208205 override fun close () {
209206 if (close(cause = null ))
210- broadcastChannel.closeSubscriber( this )
207+ broadcastChannel.updateHead(removeSub = this )
211208 }
212209
213210 // returns true if subHead was updated and broadcast channel's head must be checked
@@ -220,7 +217,7 @@ class ArrayBroadcastChannel<E>(
220217 while (needsToCheckOfferWithoutLock()) {
221218 // just use `tryLock` here and break when some other thread is checking under lock
222219 // it means that `checkOffer` must be retried after every `unlock`
223- if (! lock .tryLock()) break
220+ if (! subLock .tryLock()) break
224221 val receive: ReceiveOrClosed <E >?
225222 val token: Any?
226223 try {
@@ -241,7 +238,7 @@ class ArrayBroadcastChannel<E>(
241238 this .subHead = subHead + 1 // retrieved element for this subscriber
242239 updated = true
243240 } finally {
244- lock .unlock()
241+ subLock .unlock()
245242 }
246243 receive!! .completeResumeReceive(token!! )
247244 }
@@ -253,10 +250,8 @@ class ArrayBroadcastChannel<E>(
253250 // result is `E | POLL_FAILED | Closed`
254251 override fun pollInternal (): Any? {
255252 var updated = false
256- val result: Any?
257- lock.lock()
258- try {
259- result = peekUnderLock()
253+ val result = subLock.withLock {
254+ val result = peekUnderLock()
260255 when {
261256 result is Closed <* > -> { /* just bail out of lock */ }
262257 result == = POLL_FAILED -> { /* just bail out of lock */ }
@@ -267,8 +262,7 @@ class ArrayBroadcastChannel<E>(
267262 updated = true
268263 }
269264 }
270- } finally {
271- lock.unlock()
265+ result
272266 }
273267 // do close outside of lock
274268 (result as ? Closed <* >)?.also { close(cause = it.closeCause) }
@@ -278,17 +272,15 @@ class ArrayBroadcastChannel<E>(
278272 updated = true
279273 // and finally update broadcast's channel head if needed
280274 if (updated)
281- broadcastChannel.updateHead()
275+ broadcastChannel.updateHead(null )
282276 return result
283277 }
284278
285279 // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
286280 override fun pollSelectInternal (select : SelectInstance <* >): Any? {
287281 var updated = false
288- var result: Any?
289- lock.lock()
290- try {
291- result = peekUnderLock()
282+ val result = subLock.withLock {
283+ var result = peekUnderLock()
292284 when {
293285 result is Closed <* > -> { /* just bail out of lock */ }
294286 result == = POLL_FAILED -> { /* just bail out of lock */ }
@@ -304,8 +296,7 @@ class ArrayBroadcastChannel<E>(
304296 }
305297 }
306298 }
307- } finally {
308- lock.unlock()
299+ result
309300 }
310301 // do close outside of lock
311302 (result as ? Closed <* >)?.also { close(cause = it.closeCause) }
@@ -315,7 +306,7 @@ class ArrayBroadcastChannel<E>(
315306 updated = true
316307 // and finally update broadcast's channel head if needed
317308 if (updated)
318- broadcastChannel.updateHead()
309+ broadcastChannel.updateHead(null )
319310 return result
320311 }
321312
0 commit comments