@@ -43,38 +43,39 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
4343 @Volatile
4444 private var debugStatus: Int = FRESH
4545
46+ private val isShutdownRequested: Boolean get() {
47+ val debugStatus = debugStatus
48+ return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
49+ }
50+
4651 override fun run () {
4752 timeSource.registerTimeLoopThread()
4853 try {
4954 var shutdownNanos = Long .MAX_VALUE
50- if (notifyStartup()) {
51- runLoop@ while (true ) {
52- Thread .interrupted() // just reset interruption flag
53- var parkNanos = processNextEvent()
54- if (parkNanos == Long .MAX_VALUE ) {
55- // nothing to do, initialize shutdown timeout
56- if (shutdownNanos == Long .MAX_VALUE ) {
57- val now = timeSource.nanoTime()
58- if (shutdownNanos == Long .MAX_VALUE ) shutdownNanos = now + KEEP_ALIVE_NANOS
59- val tillShutdown = shutdownNanos - now
60- if (tillShutdown <= 0 ) break @runLoop // shut thread down
61- parkNanos = parkNanos.coerceAtMost(tillShutdown)
62- } else
63- parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS ) // limit wait time anyway
64- }
65- if (parkNanos > 0 ) {
66- // check if shutdown was requested and bail out in this case
67- if (debugStatus == SHUTDOWN_REQ ) {
68- acknowledgeShutdown()
69- break @runLoop
70- } else {
71- timeSource.parkNanos(this , parkNanos)
72- }
73- }
55+ if (! notifyStartup()) return
56+ while (true ) {
57+ Thread .interrupted() // just reset interruption flag
58+ var parkNanos = processNextEvent()
59+ if (parkNanos == Long .MAX_VALUE ) {
60+ // nothing to do, initialize shutdown timeout
61+ if (shutdownNanos == Long .MAX_VALUE ) {
62+ val now = timeSource.nanoTime()
63+ if (shutdownNanos == Long .MAX_VALUE ) shutdownNanos = now + KEEP_ALIVE_NANOS
64+ val tillShutdown = shutdownNanos - now
65+ if (tillShutdown <= 0 ) return // shut thread down
66+ parkNanos = parkNanos.coerceAtMost(tillShutdown)
67+ } else
68+ parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS ) // limit wait time anyway
69+ }
70+ if (parkNanos > 0 ) {
71+ // check if shutdown was requested and bail out in this case
72+ if (isShutdownRequested) return
73+ timeSource.parkNanos(this , parkNanos)
7474 }
7575 }
7676 } finally {
7777 _thread = null // this thread is dead
78+ acknowledgeShutdownIfNeeded()
7879 timeSource.unregisterTimeLoopThread()
7980 // recheck if queues are empty after _thread reference was set to null (!!!)
8081 if (! isEmpty) thread() // recreate thread if it is needed
@@ -110,32 +111,31 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
110111
111112 @Synchronized
112113 private fun notifyStartup (): Boolean {
113- if (debugStatus == SHUTDOWN_REQ ) return false
114+ if (isShutdownRequested ) return false
114115 debugStatus = ACTIVE
115116 (this as Object ).notifyAll()
116117 return true
117118 }
118119
119120 // used for tests
120121 @Synchronized
121- internal fun shutdown (timeout : Long ) {
122- if (_thread != null ) {
123- val deadline = System .currentTimeMillis() + timeout
124- if (debugStatus == ACTIVE || debugStatus == FRESH ) debugStatus = SHUTDOWN_REQ
125- unpark()
126- // loop while there is anything to do immediately or deadline passes
127- while (debugStatus != SHUTDOWN_ACK && _thread != null ) {
128- val remaining = deadline - System .currentTimeMillis()
129- if (remaining <= 0 ) break
130- (this as Object ).wait(timeout)
131- }
122+ fun shutdown (timeout : Long ) {
123+ val deadline = System .currentTimeMillis() + timeout
124+ if (! isShutdownRequested) debugStatus = SHUTDOWN_REQ
125+ // loop while there is anything to do immediately or deadline passes
126+ while (debugStatus != SHUTDOWN_ACK && _thread != null ) {
127+ _thread ?.let { timeSource.unpark(it) } // wake up thread if present
128+ val remaining = deadline - System .currentTimeMillis()
129+ if (remaining <= 0 ) break
130+ (this as Object ).wait(timeout)
132131 }
133132 // restore fresh status
134133 debugStatus = FRESH
135134 }
136135
137136 @Synchronized
138- private fun acknowledgeShutdown () {
137+ private fun acknowledgeShutdownIfNeeded () {
138+ if (! isShutdownRequested) return
139139 debugStatus = SHUTDOWN_ACK
140140 resetAll() // clear queues
141141 (this as Object ).notifyAll()
0 commit comments