@@ -15,18 +15,18 @@ import kotlin.jvm.*
1515 *
1616 * It may optionally implement [Delay] interface and support time-scheduled tasks.
1717 * It is created or pigged back onto (see [ThreadLocalEventLoop])
18- * by [ runBlocking] and by [Dispatchers.Unconfined].
18+ * by ` runBlocking` and by [Dispatchers.Unconfined].
1919 *
2020 * @suppress **This an internal API and should not be used from general code.**
2121 */
2222internal abstract class EventLoop : CoroutineDispatcher () {
2323 /* *
24- * Counts the number of nested [ runBlocking] and [Dispatchers.Unconfined] that use this event loop.
24+ * Counts the number of nested ` runBlocking` and [Dispatchers.Unconfined] that use this event loop.
2525 */
2626 private var useCount = 0L
2727
2828 /* *
29- * Set to true on any use by [ runBlocking] , because it potentially leaks this loop to other threads, so
29+ * Set to true on any use by ` runBlocking` , because it potentially leaks this loop to other threads, so
3030 * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
3131 * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
3232 */
@@ -147,6 +147,12 @@ private const val SCHEDULE_DISPOSED = 2
147147private const val MS_TO_NS = 1_000_000L
148148private const val MAX_MS = Long .MAX_VALUE / MS_TO_NS
149149
150+ /* *
151+ * First-line overflow protection -- limit maximal delay.
152+ * Delays longer than this one (~146 years) are considered to be delayed "forever".
153+ */
154+ private const val MAX_DELAY_NS = Long .MAX_VALUE / 2
155+
150156internal fun delayToNanos (timeMillis : Long ): Long = when {
151157 timeMillis <= 0 -> 0L
152158 timeMillis >= MAX_MS -> Long .MAX_VALUE
@@ -162,15 +168,19 @@ private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
162168private typealias Queue <T > = LockFreeTaskQueueCore <T >
163169
164170internal expect abstract class EventLoopImplPlatform () : EventLoop {
171+ // Called to unpark this event loop's thread
165172 protected fun unpark ()
173+
174+ // Called to reschedule to DefaultExecutor when this event loop is complete
175+ protected fun reschedule (now : Long , delayedTask : EventLoopImplBase .DelayedTask )
166176}
167177
168178internal abstract class EventLoopImplBase : EventLoopImplPlatform (), Delay {
169179 // null | CLOSED_EMPTY | task | Queue<Runnable>
170180 private val _queue = atomic<Any ?>(null )
171181
172182 // Allocated only only once
173- private val _delayed = atomic<ThreadSafeHeap < DelayedTask > ? > (null )
183+ private val _delayed = atomic<DelayedTaskQueue ?>(null )
174184
175185 @Volatile
176186 private var isCompleted = false
@@ -197,8 +207,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
197207 queue == = CLOSED_EMPTY -> return Long .MAX_VALUE // no more events -- closed
198208 else -> return 0 // non-empty queue
199209 }
200- val delayed = _delayed .value ? : return Long .MAX_VALUE
201- val nextDelayedTask = delayed.peek() ? : return Long .MAX_VALUE
210+ val nextDelayedTask = _delayed .value?.peek() ? : return Long .MAX_VALUE
202211 return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0 )
203212 }
204213
@@ -215,8 +224,28 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
215224 rescheduleAllDelayed()
216225 }
217226
218- override fun scheduleResumeAfterDelay (timeMillis : Long , continuation : CancellableContinuation <Unit >) =
219- schedule(DelayedResumeTask (timeMillis, continuation))
227+ public override fun scheduleResumeAfterDelay (timeMillis : Long , continuation : CancellableContinuation <Unit >) {
228+ val timeNanos = delayToNanos(timeMillis)
229+ if (timeNanos < MAX_DELAY_NS ) {
230+ val now = nanoTime()
231+ DelayedResumeTask (now + timeNanos, continuation).also { task ->
232+ continuation.disposeOnCancellation(task)
233+ schedule(now, task)
234+ }
235+ }
236+ }
237+
238+ protected fun scheduleInvokeOnTimeout (timeMillis : Long , block : Runnable ): DisposableHandle {
239+ val timeNanos = delayToNanos(timeMillis)
240+ return if (timeNanos < MAX_DELAY_NS ) {
241+ val now = nanoTime()
242+ DelayedRunnableTask (now + timeNanos, block).also { task ->
243+ schedule(now, task)
244+ }
245+ } else {
246+ NonDisposableHandle
247+ }
248+ }
220249
221250 override fun processNextEvent (): Long {
222251 // unconfined events take priority
@@ -321,24 +350,24 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
321350
322351 }
323352
324- public fun schedule (delayedTask : DelayedTask ) {
325- when (scheduleImpl(delayedTask)) {
353+ public fun schedule (now : Long , delayedTask : DelayedTask ) {
354+ when (scheduleImpl(now, delayedTask)) {
326355 SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
327- SCHEDULE_COMPLETED -> DefaultExecutor .schedule( delayedTask)
356+ SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
328357 SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
329358 else -> error(" unexpected result" )
330359 }
331360 }
332361
333362 private fun shouldUnpark (task : DelayedTask ): Boolean = _delayed .value?.peek() == = task
334363
335- private fun scheduleImpl (delayedTask : DelayedTask ): Int {
364+ private fun scheduleImpl (now : Long , delayedTask : DelayedTask ): Int {
336365 if (isCompleted) return SCHEDULE_COMPLETED
337- val delayed = _delayed .value ? : run {
338- _delayed .compareAndSet(null , ThreadSafeHeap ( ))
366+ val delayedQueue = _delayed .value ? : run {
367+ _delayed .compareAndSet(null , DelayedTaskQueue (now ))
339368 _delayed .value!!
340369 }
341- return delayedTask.schedule(delayed , this )
370+ return delayedTask.scheduleTask(now, delayedQueue , this )
342371 }
343372
344373 // It performs "hard" shutdown for test cleanup purposes
@@ -349,6 +378,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
349378
350379 // This is a "soft" (normal) shutdown
351380 private fun rescheduleAllDelayed () {
381+ val now = nanoTime()
352382 while (true ) {
353383 /*
354384 * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
@@ -358,12 +388,16 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
358388 * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
359389 */
360390 val delayedTask = _delayed .value?.removeFirstOrNull() ? : break
361- delayedTask.rescheduleOnShutdown( )
391+ reschedule(now, delayedTask )
362392 }
363393 }
364394
365395 internal abstract class DelayedTask (
366- timeMillis : Long
396+ /* *
397+ * This field can be only modified in [scheduleTask] before putting this DelayedTask
398+ * into heap to avoid overflow and corruption of heap data structure.
399+ */
400+ @JvmField var nanoTime : Long
367401 ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
368402 private var _heap : Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
369403
@@ -376,8 +410,6 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
376410
377411 override var index: Int = - 1
378412
379- @JvmField val nanoTime: Long = nanoTime() + delayToNanos(timeMillis)
380-
381413 override fun compareTo (other : DelayedTask ): Int {
382414 val dTime = nanoTime - other.nanoTime
383415 return when {
@@ -390,47 +422,94 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
390422 fun timeToExecute (now : Long ): Boolean = now - nanoTime >= 0L
391423
392424 @Synchronized
393- fun schedule ( delayed : ThreadSafeHeap < DelayedTask > , eventLoop : EventLoopImplBase ): Int {
425+ fun scheduleTask ( now : Long , delayed : DelayedTaskQueue , eventLoop : EventLoopImplBase ): Int {
394426 if (_heap == = DISPOSED_TASK ) return SCHEDULE_DISPOSED // don't add -- was already disposed
395- return if (delayed.addLastIf(this ) { ! eventLoop.isCompleted }) SCHEDULE_OK else SCHEDULE_COMPLETED
427+ delayed.addLastIf(this ) { firstTask ->
428+ if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
429+ /* *
430+ * We are about to add new task and we have to make sure that [DelayedTaskQueue]
431+ * invariant is maintained. The code in this lambda is additionally executed under
432+ * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
433+ */
434+ if (firstTask == null ) {
435+ /* *
436+ * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
437+ * the current now time even if that means "going backwards in time". This makes the structure
438+ * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
439+ * are removed from the delayed queue for execution.
440+ */
441+ delayed.timeNow = now
442+ } else {
443+ /* *
444+ * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
445+ * and only goes forward in time. We cannot let it go backwards in time or invariant can be
446+ * violated for tasks that were already scheduled.
447+ */
448+ val firstTime = firstTask.nanoTime
449+ // compute min(now, firstTime) using a wrap-safe check
450+ val minTime = if (firstTime - now >= 0 ) now else firstTime
451+ // update timeNow only when going forward in time
452+ if (minTime - delayed.timeNow > 0 ) delayed.timeNow = minTime
453+ }
454+ /* *
455+ * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
456+ * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
457+ * function can be called to reschedule from one queue to another and this might be another reason
458+ * where new task's time might now violate invariant.
459+ * We correct invariant violation (if any) by simply changing this task's time to now.
460+ */
461+ if (nanoTime - delayed.timeNow < 0 ) nanoTime = delayed.timeNow
462+ true
463+ }
464+ return SCHEDULE_OK
396465 }
397466
398- // note: DefaultExecutor.schedule performs `schedule` (above) which does sync & checks for DISPOSED_TASK
399- fun rescheduleOnShutdown () = DefaultExecutor .schedule(this )
400-
401467 @Synchronized
402468 final override fun dispose () {
403469 val heap = _heap
404470 if (heap == = DISPOSED_TASK ) return // already disposed
405471 @Suppress(" UNCHECKED_CAST" )
406- (heap as ? ThreadSafeHeap < DelayedTask > )?.remove(this ) // remove if it is in heap (first)
472+ (heap as ? DelayedTaskQueue )?.remove(this ) // remove if it is in heap (first)
407473 _heap = DISPOSED_TASK // never add again to any heap
408474 }
409475
410476 override fun toString (): String = " Delayed[nanos=$nanoTime ]"
411477 }
412478
413479 private inner class DelayedResumeTask (
414- timeMillis : Long ,
480+ nanoTime : Long ,
415481 private val cont : CancellableContinuation <Unit >
416- ) : DelayedTask(timeMillis) {
417- init {
418- // Note that this operation isn't lock-free, but very short
419- cont.disposeOnCancellation(this )
420- }
421-
422- override fun run () {
423- with (cont) { resumeUndispatched(Unit ) }
424- }
482+ ) : DelayedTask(nanoTime) {
483+ override fun run () { with (cont) { resumeUndispatched(Unit ) } }
484+ override fun toString (): String = super .toString() + cont.toString()
425485 }
426486
427- internal class DelayedRunnableTask (
428- time : Long ,
487+ private class DelayedRunnableTask (
488+ nanoTime : Long ,
429489 private val block : Runnable
430- ) : DelayedTask(time ) {
490+ ) : DelayedTask(nanoTime ) {
431491 override fun run () { block.run () }
432492 override fun toString (): String = super .toString() + block.toString()
433493 }
494+
495+ /* *
496+ * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
497+ * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
498+ * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
499+ * The invariant is that for every scheduled [DelayedTask]:
500+ *
501+ * ```
502+ * delayedTask.nanoTime - timeNow >= 0
503+ * ```
504+ *
505+ * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
506+ * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
507+ * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
508+ * (so there is nothing special to do there).
509+ */
510+ internal class DelayedTaskQueue (
511+ @JvmField var timeNow : Long
512+ ) : ThreadSafeHeap<DelayedTask>()
434513}
435514
436515internal expect fun createEventLoop (): EventLoop
@@ -439,6 +518,5 @@ internal expect fun nanoTime(): Long
439518
440519internal expect object DefaultExecutor {
441520 public fun enqueue (task : Runnable )
442- public fun schedule (delayedTask : EventLoopImplBase .DelayedTask )
443521}
444522
0 commit comments