1+ /*
2+ * Copyright 2016-2017 JetBrains s.r.o.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package kotlinx.coroutines.experimental.channels
18+
19+ import kotlinx.coroutines.experimental.ALREADY_SELECTED
20+ import kotlinx.coroutines.experimental.selects.SelectInstance
21+ import java.util.concurrent.CopyOnWriteArrayList
22+ import java.util.concurrent.locks.ReentrantLock
23+ import kotlin.concurrent.withLock
24+
25+ /* *
26+ * Broadcast channel with array buffer of a fixed [capacity].
27+ * Sender suspends only when buffer is fully due to one of the receives not being late and
28+ * receiver suspends only when buffer is empty.
29+ *
30+ * Note, that elements that are sent to the broadcast channel while there are no [open] subscribers are immediately
31+ * lost.
32+ *
33+ * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
34+ * The lock at each subscription is also used to manage concurrent attempts to receive from the same subscriber.
35+ * The lists of suspended senders or receivers are lock-free.
36+ */
37+ class ArrayBroadcastChannel <E >(
38+ /* *
39+ * Buffer capacity.
40+ */
41+ val capacity : Int
42+ ) : AbstractSendChannel<E>(), BroadcastChannel<E> {
43+ init {
44+ check(capacity >= 1 ) { " ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
45+ }
46+
47+ private val bufferLock = ReentrantLock ()
48+ private val buffer: Array <Any ?> = arrayOfNulls<Any ?>(capacity) // guarded by lock
49+
50+ // head & tail are Long (64 bits) and we assume that they never wrap around
51+ // head, tail, and size are guarded by bufferLock
52+ @Volatile
53+ private var head: Long = 0 // do modulo on use of head
54+ @Volatile
55+ private var tail: Long = 0 // do modulo on use of tail
56+ @Volatile
57+ private var size: Int = 0
58+
59+ private val subs = CopyOnWriteArrayList <Subscriber <E >>()
60+
61+ override val isBufferAlwaysFull: Boolean get() = false
62+ override val isBufferFull: Boolean get() = size >= capacity
63+
64+ override fun open (): SubscriptionReceiveChannel <E > {
65+ val sub = Subscriber (this , head)
66+ subs.add(sub)
67+ // between creating and adding of subscription into the list the buffer head could have been bumped past it,
68+ // so here we check if it did happen and update the head in subscription in this case
69+ // we did not leak newly created subscription yet, so its subHead cannot update
70+ val head = this .head // volatile read after sub was added to subs
71+ if (head != sub.subHead) {
72+ // needs update
73+ sub.subHead = head
74+ updateHead() // and also must recompute head of the buffer
75+ }
76+ return sub
77+ }
78+
79+ override fun close (cause : Throwable ? ): Boolean {
80+ if (! super .close(cause)) return false
81+ checkSubOffers()
82+ return true
83+ }
84+
85+ // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
86+ override fun offerInternal (element : E ): Any {
87+ bufferLock.withLock {
88+ // check if closed for send (under lock, so size cannot change)
89+ closedForSend?.let { return it }
90+ val size = this .size
91+ if (size >= capacity) return OFFER_FAILED
92+ val tail = this .tail
93+ buffer[(tail % capacity).toInt()] = element
94+ this .size = size + 1
95+ this .tail = tail + 1
96+ }
97+ // if offered successfully, then check subs outside of lock
98+ checkSubOffers()
99+ return OFFER_SUCCESS
100+ }
101+
102+ // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
103+ override fun offerSelectInternal (element : E , select : SelectInstance <* >): Any {
104+ bufferLock.withLock {
105+ // check if closed for send (under lock, so size cannot change)
106+ closedForSend?.let { return it }
107+ val size = this .size
108+ if (size >= capacity) return OFFER_FAILED
109+ // let's try to select sending this element to buffer
110+ if (! select.trySelect(null )) { // :todo: move trySelect completion outside of lock
111+ return ALREADY_SELECTED
112+ }
113+ val tail = this .tail
114+ buffer[(tail % capacity).toInt()] = element
115+ this .size = size + 1
116+ this .tail = tail + 1
117+ }
118+ // if offered successfully, then check subs outside of lock
119+ checkSubOffers()
120+ return OFFER_SUCCESS
121+ }
122+
123+ private fun closeSubscriber (sub : Subscriber <E >) {
124+ subs.remove(sub)
125+ if (head == sub.subHead)
126+ updateHead()
127+ }
128+
129+ private fun checkSubOffers () {
130+ var updated = false
131+ @Suppress(" LoopToCallChain" ) // must invoke `checkOffer` on every sub
132+ for (sub in subs) {
133+ if (sub.checkOffer()) updated = true
134+ }
135+ if (updated)
136+ updateHead()
137+ }
138+
139+ private fun updateHead () {
140+ // compute minHead w/o lock (it will be eventually consistent)
141+ val minHead = computeMinHead()
142+ // update head in a loop
143+ while (true ) {
144+ var send: Send ? = null
145+ var token: Any? = null
146+ bufferLock.withLock {
147+ val tail = this .tail
148+ var head = this .head
149+ val targetHead = minHead.coerceAtMost(tail)
150+ if (targetHead <= head) return // nothing to do -- head was already moved
151+ var size = this .size
152+ // clean up removed (on not need if we don't have any subscribers anymore)
153+ while (head < targetHead) {
154+ buffer[(head % capacity).toInt()] = null
155+ val wasFull = size >= capacity
156+ // update the size before checking queue (no more senders can queue up)
157+ this .head = ++ head
158+ this .size = -- size
159+ if (wasFull) {
160+ while (true ) {
161+ send = takeFirstSendOrPeekClosed() ? : break // when when no sender
162+ if (send is Closed <* >) break // break when closed for send
163+ token = send!! .tryResumeSend(idempotent = null )
164+ if (token != null ) {
165+ // put sent element to the buffer
166+ buffer[(tail % capacity).toInt()] = (send as Send ).pollResult
167+ this .size = size + 1
168+ this .tail = tail + 1
169+ return @withLock // go out of lock to wakeup this sender
170+ }
171+ }
172+ }
173+ }
174+ return // done updating here -> return
175+ }
176+ // we only get out of the lock normally when there is a sender to resume
177+ send!! .completeResumeSend(token!! )
178+ // since we've just sent an element, we might need to resume some receivers
179+ checkSubOffers()
180+ }
181+ }
182+
183+ private fun computeMinHead (): Long {
184+ var minHead = Long .MAX_VALUE
185+ for (sub in subs)
186+ minHead = minHead.coerceAtMost(sub.subHead) // volatile (atomic) reads of subHead
187+ return minHead
188+ }
189+
190+ @Suppress(" UNCHECKED_CAST" )
191+ private fun elementAt (index : Long ): E = buffer[(index % capacity).toInt()] as E
192+
193+ private class Subscriber <E >(
194+ private val broadcastChannel : ArrayBroadcastChannel <E >,
195+ @Volatile @JvmField var subHead : Long // guarded by lock
196+ ) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
197+ private val lock = ReentrantLock ()
198+
199+ override val isBufferAlwaysEmpty: Boolean get() = false
200+ override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail
201+ override val isBufferAlwaysFull: Boolean get() = error(" Should not be used" )
202+ override val isBufferFull: Boolean get() = error(" Should not be used" )
203+
204+ override fun close () {
205+ if (close(cause = null ))
206+ broadcastChannel.closeSubscriber(this )
207+ }
208+
209+ // returns true if subHead was updated and broadcast channel's head must be checked
210+ // this method is lock-free (it never waits on lock)
211+ @Suppress(" UNCHECKED_CAST" )
212+ fun checkOffer (): Boolean {
213+ var updated = false
214+ var closed: Closed <* >? = null
215+ loop@
216+ while (needsToCheckOfferWithoutLock()) {
217+ // just use `tryLock` here and break when some other thread is checking under lock
218+ // it means that `checkOffer` must be retried after every `unlock`
219+ if (! lock.tryLock()) break
220+ val receive: ReceiveOrClosed <E >?
221+ val token: Any?
222+ try {
223+ val result = peekUnderLock()
224+ when {
225+ result == = POLL_FAILED -> continue @loop // must retest `needsToCheckOfferWithoutLock` outside of the lock
226+ result is Closed <* > -> {
227+ closed = result
228+ break @loop // was closed
229+ }
230+ }
231+ // find a receiver for an element
232+ receive = takeFirstReceiveOrPeekClosed() ? : break // break when no one's receiving
233+ if (receive is Closed <* >) break // noting more to do if this sub already closed
234+ token = receive.tryResumeReceive(result as E , idempotent = null )
235+ if (token == null ) continue // bail out here to next iteration (see for next receiver)
236+ val subHead = this .subHead
237+ this .subHead = subHead + 1 // retrieved element for this subscriber
238+ updated = true
239+ } finally {
240+ lock.unlock()
241+ }
242+ receive!! .completeResumeReceive(token!! )
243+ }
244+ // do close outside of lock if needed
245+ closed?.also { close(cause = it.closeCause) }
246+ return updated
247+ }
248+
249+ // result is `E | POLL_FAILED | Closed`
250+ override fun pollInternal (): Any? {
251+ var updated = false
252+ val result: Any?
253+ lock.lock()
254+ try {
255+ result = peekUnderLock()
256+ when {
257+ result is Closed <* > -> { /* just bail out of lock */ }
258+ result == = POLL_FAILED -> { /* just bail out of lock */ }
259+ else -> {
260+ // update subHead after retrieiving element from buffer
261+ val subHead = this .subHead
262+ this .subHead = subHead + 1
263+ updated = true
264+ }
265+ }
266+ } finally {
267+ lock.unlock()
268+ }
269+ // do close outside of lock
270+ (result as ? Closed <* >)?.also { close(cause = it.closeCause) }
271+ // there could have been checkOffer attempt while we were holding lock
272+ // now outside the lock recheck if anything else to offer
273+ if (checkOffer())
274+ updated = true
275+ // and finally update broadcast's channel head if needed
276+ if (updated)
277+ broadcastChannel.updateHead()
278+ return result
279+ }
280+
281+ // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
282+ override fun pollSelectInternal (select : SelectInstance <* >): Any? {
283+ var updated = false
284+ var result: Any?
285+ lock.lock()
286+ try {
287+ result = peekUnderLock()
288+ when {
289+ result is Closed <* > -> { /* just bail out of lock */ }
290+ result == = POLL_FAILED -> { /* just bail out of lock */ }
291+ else -> {
292+ // let's try to select receiving this element from buffer
293+ if (! select.trySelect(null )) { // :todo: move trySelect completion outside of lock
294+ result = ALREADY_SELECTED
295+ } else {
296+ // update subHead after retrieiving element from buffer
297+ val subHead = this .subHead
298+ this .subHead = subHead + 1
299+ updated = true
300+ }
301+ }
302+ }
303+ } finally {
304+ lock.unlock()
305+ }
306+ // do close outside of lock
307+ (result as ? Closed <* >)?.also { close(cause = it.closeCause) }
308+ // there could have been checkOffer attempt while we were holding lock
309+ // now outside the lock recheck if anything else to offer
310+ if (checkOffer())
311+ updated = true
312+ // and finally update broadcast's channel head if needed
313+ if (updated)
314+ broadcastChannel.updateHead()
315+ return result
316+ }
317+
318+ // Must invoke this check this after lock, because offer's invocation of `checkOffer` might have failed
319+ // to `tryLock` just before the lock was about to unlocked, thus loosing notification to this
320+ // subscription about an element that was just offered
321+ private fun needsToCheckOfferWithoutLock (): Boolean {
322+ if (closedForReceive != null )
323+ return false // already closed -> nothing to do
324+ if (isBufferEmpty && broadcastChannel.closedForReceive == null )
325+ return false // no data for us && broadcast channel was not closed yet -> nothing to do
326+ return true // check otherwise
327+ }
328+
329+ // guarded by lock, returns:
330+ // E - the element from the buffer at subHead
331+ // Closed<*> when closed;
332+ // POLL_FAILED when there seems to be no data, but must retest `needsToCheckOfferWithoutLock` outside of lock
333+ private fun peekUnderLock (): Any? {
334+ val subHead = this .subHead // guarded read (can be non-volatile read)
335+ // note: from the broadcastChannel we must read closed token first, then read its tail
336+ // because it is Ok if tail moves in between the reads (we make decision based on tail first)
337+ val closed = broadcastChannel.closedForReceive // unguarded volatile read
338+ val tail = broadcastChannel.tail // unguarded volatile read
339+ if (subHead >= tail) {
340+ // no elements to poll from the queue -- check if closed
341+ return closed ? : POLL_FAILED // must retest `needsToCheckOfferWithoutLock` outside of the lock
342+ }
343+ return broadcastChannel.elementAt(subHead)
344+ }
345+ }
346+ }
0 commit comments