@@ -290,35 +290,33 @@ internal class CoroutineScheduler(
290290
291291 override fun execute (command : Runnable ) = dispatch(command)
292292
293- override fun close () = shutdown(1000L )
293+ override fun close () = shutdown(10_000L )
294294
295- /*
296- * Shuts down current scheduler and waits until all threads are stopped.
297- * This method uses unsafe API (does unconditional unparks)
298- * and intended to be used only for testing. Invocation has no additional effect if already closed.
299- */
295+ // Shuts down current scheduler and waits until all work is done and all threads are stopped.
300296 fun shutdown (timeout : Long ) {
301297 // atomically set termination flag which is checked when workers are added or removed
302298 if (! isTerminated.compareAndSet(false , true )) return
303-
304- /*
305- * Shutdown current thread. Note that shutdown is testing utility,
306- * so we don't do anything special to properly verify that no tasks are submitted after close()
307- */
308- val thread = Thread .currentThread()
309- (thread as ? Worker )?.tryReleaseCpu(WorkerState .TERMINATED )
310-
299+ // make sure we are not waiting for the current thread
300+ val currentWorker = Thread .currentThread() as ? Worker
311301 // Capture # of created workers that cannot change anymore (mind the synchronized block!)
312302 val created = synchronized(workers) { createdWorkers }
313303 for (i in 1 .. created) {
314304 val worker = workers[i]!!
315- if (worker.isAlive) {
316- // Unparking alive thread is unsafe in general, but acceptable for testing purposes
305+ if (worker.isAlive && worker != = currentWorker) {
317306 LockSupport .unpark(worker)
318307 worker.join(timeout)
308+ worker.localQueue.offloadAllWork(globalQueue)
319309 }
310+
311+ }
312+ // Finish processing tasks from globalQueue and/or from this worker's local queue
313+ while (true ) {
314+ val task = currentWorker?.findTask() ? : globalQueue.removeFirstOrNull() ? : break
315+ runSafely(task)
320316 }
321- // cleanup state to make sure that tryUnpark tries to create new threads and crashes because it isTerminated
317+ // Shutdown current thread
318+ currentWorker?.tryReleaseCpu(WorkerState .TERMINATED )
319+ // cleanup state to make sure that tryUnpark tries to create new threads and fails because isTerminated
322320 assert (cpuPermits.availablePermits() == corePoolSize)
323321 parkedWorkersStack.value = 0L
324322 controlState.value = 0L
@@ -333,6 +331,7 @@ internal class CoroutineScheduler(
333331 * @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO)
334332 */
335333 fun dispatch (block : Runnable , taskContext : TaskContext = NonBlockingContext , fair : Boolean = false) {
334+ timeSource.trackTask() // this is needed for virtual time support
336335 // TODO at some point make DispatchTask extend Task and make its field settable to save an allocation
337336 val task = Task (block, schedulerTimeSource.nanoTime(), taskContext)
338337 // try to submit the task to the local queue and act depending on the result
@@ -439,7 +438,7 @@ internal class CoroutineScheduler(
439438 private fun createNewWorker (): Int {
440439 synchronized(workers) {
441440 // Make sure we're not trying to resurrect terminated scheduler
442- if (isTerminated.value) throw ShutdownException ( )
441+ if (isTerminated.value) throw RejectedExecutionException ( " $schedulerName was terminated " )
443442 val state = controlState.value
444443 val created = createdWorkers(state)
445444 val blocking = blockingWorkers(state)
@@ -456,9 +455,6 @@ internal class CoroutineScheduler(
456455 }
457456 }
458457
459- // Is thrown when attempting to create new worker, but this scheduler isTerminated
460- private class ShutdownException : RuntimeException ()
461-
462458 /* *
463459 * Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
464460 */
@@ -565,6 +561,17 @@ internal class CoroutineScheduler(
565561 " ]"
566562 }
567563
564+ private fun runSafely (task : Task ) {
565+ try {
566+ task.run ()
567+ } catch (e: Throwable ) {
568+ val thread = Thread .currentThread()
569+ thread.uncaughtExceptionHandler.uncaughtException(thread, e)
570+ } finally {
571+ timeSource.unTrackTask()
572+ }
573+ }
574+
568575 internal inner class Worker private constructor() : Thread() {
569576 init {
570577 isDaemon = true
@@ -685,41 +692,28 @@ internal class CoroutineScheduler(
685692 private var lastStealIndex = 0 // try in order repeated, reset when unparked
686693
687694 override fun run () {
688- try {
689- var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
690- while (! isTerminated.value && state != WorkerState .TERMINATED ) {
691- val task = findTask()
692- if (task == null ) {
693- // Wait for a job with potential park
694- if (state == WorkerState .CPU_ACQUIRED ) {
695- cpuWorkerIdle()
696- } else {
697- blockingWorkerIdle()
698- }
699- wasIdle = true
695+ var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
696+ while (! isTerminated.value && state != WorkerState .TERMINATED ) {
697+ val task = findTask()
698+ if (task == null ) {
699+ // Wait for a job with potential park
700+ if (state == WorkerState .CPU_ACQUIRED ) {
701+ cpuWorkerIdle()
700702 } else {
701- if (wasIdle) {
702- idleReset(task.mode)
703- wasIdle = false
704- }
705- beforeTask(task)
706- runSafely(task)
707- afterTask(task)
703+ blockingWorkerIdle()
708704 }
705+ wasIdle = true
706+ } else {
707+ if (wasIdle) {
708+ idleReset(task.mode)
709+ wasIdle = false
710+ }
711+ beforeTask(task)
712+ runSafely(task)
713+ afterTask(task)
709714 }
710- } catch (e: ShutdownException ) {
711- // race with shutdown -- ignore exception and don't print it on the console
712- } finally {
713- tryReleaseCpu(WorkerState .TERMINATED )
714- }
715- }
716-
717- private fun runSafely (task : Task ) {
718- try {
719- task.run ()
720- } catch (t: Throwable ) {
721- uncaughtExceptionHandler.uncaughtException(this , t)
722715 }
716+ tryReleaseCpu(WorkerState .TERMINATED )
723717 }
724718
725719 private fun beforeTask (task : Task ) {
@@ -823,7 +817,7 @@ internal class CoroutineScheduler(
823817 private fun tryTerminateWorker () {
824818 synchronized(workers) {
825819 // Make sure we're not trying race with termination of scheduler
826- if (isTerminated.value) throw ShutdownException ()
820+ if (isTerminated.value) return
827821 // Someone else terminated, bail out
828822 if (createdWorkers <= corePoolSize) return
829823 // Try to find blocking task before termination
@@ -906,7 +900,7 @@ internal class CoroutineScheduler(
906900 spins = 0 // Volatile write, should be written last
907901 }
908902
909- private fun findTask (): Task ? {
903+ internal fun findTask (): Task ? {
910904 if (tryAcquireCpuPermit()) return findTaskWithCpuPermit()
911905 /*
912906 * If the local queue is empty, try to extract blocking task from global queue.
0 commit comments