44
55package kotlinx.coroutines.rx2
66
7- import io.reactivex.Scheduler
7+ import io.reactivex.*
8+ import io.reactivex.disposables.*
9+ import io.reactivex.plugins.*
10+ import kotlinx.atomicfu.*
811import kotlinx.coroutines.*
9- import java.util.concurrent.TimeUnit
10- import kotlin.coroutines.CoroutineContext
12+ import kotlinx.coroutines.channels.*
13+ import java.util.concurrent.*
14+ import kotlin.coroutines.*
1115
1216/* *
1317 * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
1418 * and provides native support of [delay] and [withTimeout].
1519 */
16- public fun Scheduler.asCoroutineDispatcher (): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher (this )
20+ public fun Scheduler.asCoroutineDispatcher (): CoroutineDispatcher =
21+ if (this is DispatcherScheduler ) {
22+ dispatcher
23+ } else {
24+ SchedulerCoroutineDispatcher (this )
25+ }
26+
27+ @Deprecated(level = DeprecationLevel .HIDDEN , message = " Since 1.4.2, binary compatibility with earlier versions" )
28+ @JvmName(" asCoroutineDispatcher" )
29+ public fun Scheduler.asCoroutineDispatcher0 (): SchedulerCoroutineDispatcher =
30+ SchedulerCoroutineDispatcher (this )
31+
32+ /* *
33+ * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
34+ */
35+ public fun CoroutineDispatcher.asScheduler (): Scheduler =
36+ if (this is SchedulerCoroutineDispatcher ) {
37+ scheduler
38+ } else {
39+ DispatcherScheduler (this )
40+ }
41+
42+ private class DispatcherScheduler (@JvmField val dispatcher : CoroutineDispatcher ) : Scheduler() {
43+
44+ private val schedulerJob = SupervisorJob ()
45+
46+ /* *
47+ * The scope for everything happening in this [DispatcherScheduler].
48+ *
49+ * Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
50+ */
51+ private val scope = CoroutineScope (schedulerJob + dispatcher)
52+
53+ /* *
54+ * The counter of created workers, for their pretty-printing.
55+ */
56+ private val workerCounter = atomic(1L )
57+
58+ override fun scheduleDirect (block : Runnable , delay : Long , unit : TimeUnit ): Disposable =
59+ scope.scheduleTask(block, unit.toMillis(delay)) { task ->
60+ Runnable { scope.launch { task() } }
61+ }
62+
63+ override fun createWorker (): Worker = DispatcherWorker (workerCounter.getAndIncrement(), dispatcher, schedulerJob)
64+
65+ override fun shutdown () {
66+ schedulerJob.cancel()
67+ }
68+
69+ private class DispatcherWorker (
70+ private val counter : Long ,
71+ private val dispatcher : CoroutineDispatcher ,
72+ parentJob : Job
73+ ) : Worker() {
74+
75+ private val workerJob = SupervisorJob (parentJob)
76+ private val workerScope = CoroutineScope (workerJob + dispatcher)
77+ private val blockChannel = Channel < suspend () -> Unit > (Channel .UNLIMITED )
78+
79+ init {
80+ workerScope.launch {
81+ blockChannel.consumeEach {
82+ it()
83+ }
84+ }
85+ }
86+
87+ override fun schedule (block : Runnable , delay : Long , unit : TimeUnit ): Disposable =
88+ workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
89+ Runnable { blockChannel.trySend(task) }
90+ }
91+
92+ override fun isDisposed (): Boolean = ! workerScope.isActive
93+
94+ override fun dispose () {
95+ blockChannel.close()
96+ workerJob.cancel()
97+ }
98+
99+ override fun toString (): String = " $dispatcher (worker $counter , ${if (isDisposed) " disposed" else " active" } )"
100+ }
101+
102+ override fun toString (): String = dispatcher.toString()
103+ }
104+
105+ private typealias Task = suspend () -> Unit
106+
107+ /* *
108+ * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
109+ * milliseconds.
110+ */
111+ private fun CoroutineScope.scheduleTask (
112+ block : Runnable ,
113+ delayMillis : Long ,
114+ adaptForScheduling : (Task ) -> Runnable
115+ ): Disposable {
116+ val ctx = coroutineContext
117+ var handle: DisposableHandle ? = null
118+ val disposable = Disposables .fromRunnable {
119+ // null if delay <= 0
120+ handle?.dispose()
121+ }
122+ val decoratedBlock = RxJavaPlugins .onSchedule(block)
123+ suspend fun task () {
124+ if (disposable.isDisposed) return
125+ try {
126+ runInterruptible {
127+ decoratedBlock.run ()
128+ }
129+ } catch (e: Throwable ) {
130+ handleUndeliverableException(e, ctx)
131+ }
132+ }
133+
134+ val toSchedule = adaptForScheduling(::task)
135+ if (! isActive) return Disposables .disposed()
136+ if (delayMillis <= 0 ) {
137+ toSchedule.run ()
138+ } else {
139+ @Suppress(" INVISIBLE_MEMBER" )
140+ ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
141+ }
142+ return disposable
143+ }
17144
18145/* *
19146 * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
@@ -45,8 +172,10 @@ public class SchedulerCoroutineDispatcher(
45172
46173 /* * @suppress */
47174 override fun toString (): String = scheduler.toString()
175+
48176 /* * @suppress */
49177 override fun equals (other : Any? ): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler == = scheduler
178+
50179 /* * @suppress */
51180 override fun hashCode (): Int = System .identityHashCode(scheduler)
52- }
181+ }
0 commit comments