1+ /*
2+ * Copyright 2016-2017 JetBrains s.r.o.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package kotlinx.coroutines.experimental.channels
18+
19+ import kotlinx.coroutines.experimental.MODE_DIRECT
20+ import kotlinx.coroutines.experimental.internal.Symbol
21+ import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
22+ import kotlinx.coroutines.experimental.selects.SelectInstance
23+ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
24+ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
25+
26+ /* *
27+ * Broadcasts the most recently sent value (aka [value]) to all [open] subscribers.
28+ *
29+ * Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
30+ * while previously sent elements **are lost**.
31+ * Sender to this broadcast channel never suspends and [offer] always returns `true`.
32+ *
33+ * A secondary constructor can be used to create an instance of this class that already holds a value.
34+ *
35+ * This implementation is fully lock-free. In this implementation
36+ * [opening][open] and [closing][SubscriptionReceiveChannel.close] subscription takes O(N) time, where N is the
37+ * number of subscribers.
38+ */
39+ public class ValueBroadcastChannel <E >() : BroadcastChannel<E> {
40+ /* *
41+ * Creates an instance of this class that already holds a value.
42+ *
43+ * It is as a shortcut to creating an instance with a default constructor and
44+ * immediately sending a value: `ValueBroadcastChannel().apply { offer(value) }`.
45+ */
46+ constructor (value: E ) : this () {
47+ state = State <E >(value, null )
48+ }
49+
50+ @Suppress(" UNCHECKED_CAST" )
51+ @Volatile
52+ private var state: Any = INITIAL_STATE // State | Closed
53+
54+ @Volatile
55+ private var updating = 0
56+
57+ private companion object {
58+ @JvmField
59+ val STATE : AtomicReferenceFieldUpdater <ValueBroadcastChannel <* >, Any > = AtomicReferenceFieldUpdater .
60+ newUpdater(ValueBroadcastChannel ::class .java, Any ::class .java, " state" )
61+
62+ @JvmField
63+ val UPDATING : AtomicIntegerFieldUpdater <ValueBroadcastChannel <* >> = AtomicIntegerFieldUpdater .
64+ newUpdater(ValueBroadcastChannel ::class .java, " updating" )
65+
66+ @JvmField
67+ val CLOSED = Closed (null )
68+
69+ @JvmField
70+ val UNDEFINED = Symbol (" UNDEFINED" )
71+
72+ @JvmField
73+ val INITIAL_STATE = State <Any ?>(UNDEFINED , null )
74+ }
75+
76+ private class State <E >(
77+ @JvmField val value : Any? , // UNDEFINED | E
78+ @JvmField val subscribers : Array <Subscriber <E >>?
79+ )
80+
81+ private class Closed (@JvmField val closeCause : Throwable ? ) {
82+ val sendException: Throwable get() = closeCause ? : ClosedSendChannelException (DEFAULT_CLOSE_MESSAGE )
83+ val valueException: Throwable get() = closeCause ? : IllegalStateException (DEFAULT_CLOSE_MESSAGE )
84+ }
85+
86+ /* *
87+ * The most recently sent element to this channel.
88+ *
89+ * Access to this property throws [IllegalStateException] when this class is constructed without
90+ * initial value and no value was sent yet or if it was [closed][close] _normally_ and
91+ * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
92+ */
93+ @Suppress(" UNCHECKED_CAST" )
94+ public val value: E get() {
95+ val state = this .state
96+ when (state) {
97+ is Closed -> throw state.valueException
98+ is State <* > -> {
99+ if (state.value == = UNDEFINED ) throw IllegalStateException (" No value" )
100+ return state.value as E
101+ }
102+ else -> error(" Invalid state $state " )
103+ }
104+ }
105+
106+ /* *
107+ * The most recently sent element to this channel or `null` when this class is constructed without
108+ * initial value and no value was sent yet or if it was [closed][close].
109+ */
110+ @Suppress(" UNCHECKED_CAST" )
111+ public val valueOrNull: E ? get() {
112+ val state = this .state
113+ when (state) {
114+ is Closed -> return null
115+ is State <* > -> {
116+ if (state.value == = UNDEFINED ) return null
117+ return state.value as E
118+ }
119+ else -> error(" Invalid state $state " )
120+ }
121+ }
122+
123+ override val isClosedForSend: Boolean get() = state is Closed
124+ override val isFull: Boolean get() = false
125+
126+ @Suppress(" UNCHECKED_CAST" )
127+ override fun open (): SubscriptionReceiveChannel <E > {
128+ val subscriber = Subscriber <E >(this )
129+ while (true ) { // lock-free loop on state
130+ val state = this .state
131+ when (state) {
132+ is Closed -> {
133+ subscriber.close(state.closeCause)
134+ return subscriber
135+ }
136+ is State <* > -> {
137+ if (state.value != = UNDEFINED )
138+ subscriber.offerInternal(state.value as E )
139+ val update = State (state.value, addSubscriber((state as State <E >).subscribers, subscriber))
140+ if (STATE .compareAndSet(this , state, update))
141+ return subscriber
142+ }
143+ else -> error(" Invalid state $state " )
144+ }
145+ }
146+ }
147+
148+ @Suppress(" UNCHECKED_CAST" )
149+ private fun closeSubscriber (subscriber : Subscriber <E >) {
150+ while (true ) { // lock-free loop on state
151+ val state = this .state
152+ when (state) {
153+ is Closed -> return
154+ is State <* > -> {
155+ val update = State (state.value, removeSubscriber((state as State <E >).subscribers!! , subscriber))
156+ if (STATE .compareAndSet(this , state, update))
157+ return
158+ }
159+ else -> error(" Invalid state $state " )
160+ }
161+ }
162+ }
163+
164+ private fun addSubscriber (list : Array <Subscriber <E >>? , subscriber : Subscriber <E >): Array <Subscriber <E >> {
165+ if (list == null ) return Array <Subscriber <E >>(1 ) { subscriber }
166+ return list + subscriber
167+ }
168+
169+ @Suppress(" UNCHECKED_CAST" )
170+ private fun removeSubscriber (list : Array <Subscriber <E >>, subscriber : Subscriber <E >): Array <Subscriber <E >>? {
171+ val n = list.size
172+ val i = list.indexOf(subscriber)
173+ check(i >= 0 )
174+ if (n == 1 ) return null
175+ val update = arrayOfNulls<Subscriber <E >>(n - 1 )
176+ System .arraycopy(list, 0 , update, 0 , i)
177+ System .arraycopy(list, i + 1 , update, i, n - i - 1 )
178+ return update as Array <Subscriber <E >>
179+ }
180+
181+ @Suppress(" UNCHECKED_CAST" )
182+ override fun close (cause : Throwable ? ): Boolean {
183+ while (true ) { // lock-free loop on state
184+ val state = this .state
185+ when (state) {
186+ is Closed -> return false
187+ is State <* > -> {
188+ val update = if (cause == null ) CLOSED else Closed (cause)
189+ if (STATE .compareAndSet(this , state, update)) {
190+ (state as State <E >).subscribers?.forEach { it.close(cause) }
191+ return true
192+ }
193+ }
194+ else -> error(" Invalid state $state " )
195+ }
196+ }
197+ }
198+
199+ /* *
200+ * Sends the value to all subscribed receives and stores this value as the most recent state for
201+ * future subscribers. This implementation never suspends.
202+ *
203+ * It throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
204+ * It throws the original [close] cause exception if the channel has _failed_.
205+ */
206+ suspend override fun send (element : E ) {
207+ offerInternal(element)?.let { throw it.sendException }
208+ }
209+
210+ /* *
211+ * Sends the value to all subscribed receives and stores this value as the most recent state for
212+ * future subscribers. This implementation always returns `true`.
213+ *
214+ * It throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
215+ * It throws the original [close] cause exception if the channel has _failed_.
216+ */
217+ override fun offer (element : E ): Boolean {
218+ offerInternal(element)?.let { throw it.sendException }
219+ return true
220+ }
221+
222+ @Suppress(" UNCHECKED_CAST" )
223+ private fun offerInternal (element : E ): Closed ? {
224+ // If some other thread is updating the state in its offer operation we assume that our offer had linearized
225+ // before that offer (we lost) and that offer overwrote us and conflated our offer.
226+ if (! UPDATING .compareAndSet(this , 0 , 1 )) return null
227+ try {
228+ while (true ) { // lock-free loop on state
229+ val state = this .state
230+ when (state) {
231+ is Closed -> return state
232+ is State <* > -> {
233+ val update = State (element, (state as State <E >).subscribers)
234+ if (STATE .compareAndSet(this , state, update)) {
235+ // Note: Using offerInternal here to ignore the case when this subscriber was
236+ // already concurrently closed (assume the close had conflated our offer for this
237+ // particular subscriber).
238+ state.subscribers?.forEach { it.offerInternal(element) }
239+ return null
240+ }
241+ }
242+ else -> error(" Invalid state $state " )
243+ }
244+ }
245+ } finally {
246+ updating = 0 // reset the updating flag to zero even when something goes wrong
247+ }
248+ }
249+
250+ override fun <R > registerSelectSend (select : SelectInstance <R >, element : E , block : suspend () -> R ) {
251+ if (! select.trySelect(idempotent = null )) return
252+ offerInternal(element)?.let {
253+ select.resumeSelectWithException(it.sendException, MODE_DIRECT )
254+ return
255+ }
256+ block.startCoroutineUndispatched(select.completion)
257+ }
258+
259+ private class Subscriber <E >(
260+ private val broadcastChannel : ValueBroadcastChannel <E >
261+ ) : ConflatedChannel<E>(), SubscriptionReceiveChannel<E> {
262+ override fun close () {
263+ if (close(cause = null ))
264+ broadcastChannel.closeSubscriber(this )
265+ }
266+
267+ public override fun offerInternal (element : E ): Any = super .offerInternal(element)
268+ }
269+ }
0 commit comments