1+ /*
2+ * Copyright 2016-2017 JetBrains s.r.o.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package kotlinx.coroutines.experimental.selects
18+
19+ import kotlinx.coroutines.experimental.Deferred
20+ import kotlinx.coroutines.experimental.Job
21+ import kotlinx.coroutines.experimental.channels.ReceiveChannel
22+ import kotlinx.coroutines.experimental.channels.SendChannel
23+ import java.util.*
24+ import kotlin.coroutines.experimental.Continuation
25+ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
26+
27+ /* *
28+ * Waits for the result of multiple suspending functions simultaneously like [select], but in an _unbiased_
29+ * way when multiple clauses are selectable at the same time.
30+ *
31+ * This unbiased implementation of `select` expression randomly shuffles the clauses before checking
32+ * if they are selectable, thus ensuring that there is no statistical bias to the selection of the first
33+ * clauses.
34+ *
35+ * See [select] function description for all the other details.
36+ */
37+ public inline suspend fun <R > selectUnbiased (crossinline builder : SelectBuilder <R >.() -> Unit ): R =
38+ suspendCoroutineOrReturn { cont ->
39+ val scope = UnbiasedSelectBuilderImpl (cont)
40+ try {
41+ builder(scope)
42+ } catch (e: Throwable ) {
43+ scope.handleBuilderException(e)
44+ }
45+ scope.initSelectResult()
46+ }
47+
48+
49+ @PublishedApi
50+ internal class UnbiasedSelectBuilderImpl <in R >(cont : Continuation <R >) : SelectBuilder<R> {
51+ val instance = SelectBuilderImpl (cont)
52+ val clauses = arrayListOf< () -> Unit > ()
53+
54+ @PublishedApi
55+ internal fun handleBuilderException (e : Throwable ) = instance.handleBuilderException(e)
56+
57+ @PublishedApi
58+ internal fun initSelectResult (): Any? {
59+ if (! instance.isSelected) {
60+ try {
61+ Collections .shuffle(clauses)
62+ clauses.forEach { it.invoke() }
63+ } catch (e: Throwable ) {
64+ instance.handleBuilderException(e)
65+ }
66+ }
67+ return instance.initSelectResult()
68+ }
69+
70+ override fun Job.onJoin (block : suspend () -> R ) {
71+ clauses + = { registerSelectJoin(instance, block) }
72+ }
73+
74+ override fun <T > Deferred<T>.onAwait (block : suspend (T ) -> R ) {
75+ clauses + = { registerSelectAwait(instance, block) }
76+ }
77+
78+ override fun <E > SendChannel<E>.onSend (element : E , block : suspend () -> R ) {
79+ clauses + = { registerSelectSend(instance, element, block) }
80+ }
81+
82+ override fun <E > ReceiveChannel<E>.onReceive (block : suspend (E ) -> R ) {
83+ clauses + = { registerSelectReceive(instance, block) }
84+ }
85+
86+ override fun <E > ReceiveChannel<E>.onReceiveOrNull (block : suspend (E ? ) -> R ) {
87+ clauses + = { registerSelectReceiveOrNull(instance, block) }
88+ }
89+ }
0 commit comments