@@ -44,31 +44,32 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
4444 private var debugStatus: Int = FRESH
4545
4646 override fun run () {
47- var shutdownNanos = Long .MAX_VALUE
4847 timeSource.registerTimeLoopThread()
49- notifyStartup()
5048 try {
51- runLoop@ while (true ) {
52- Thread .interrupted() // just reset interruption flag
53- var parkNanos = processNextEvent()
54- if (parkNanos == Long .MAX_VALUE ) {
55- // nothing to do, initialize shutdown timeout
56- if (shutdownNanos == Long .MAX_VALUE ) {
57- val now = timeSource.nanoTime()
58- if (shutdownNanos == Long .MAX_VALUE ) shutdownNanos = now + KEEP_ALIVE_NANOS
59- val tillShutdown = shutdownNanos - now
60- if (tillShutdown <= 0 ) break @runLoop // shut thread down
61- parkNanos = parkNanos.coerceAtMost(tillShutdown)
62- } else
63- parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS ) // limit wait time anyway
64- }
65- if (parkNanos > 0 ) {
66- // check if shutdown was requested and bail out in this case
67- if (debugStatus == SHUTDOWN_REQ ) {
68- acknowledgeShutdown()
69- break @runLoop
70- } else {
71- timeSource.parkNanos(this , parkNanos)
49+ var shutdownNanos = Long .MAX_VALUE
50+ if (notifyStartup()) {
51+ runLoop@ while (true ) {
52+ Thread .interrupted() // just reset interruption flag
53+ var parkNanos = processNextEvent()
54+ if (parkNanos == Long .MAX_VALUE ) {
55+ // nothing to do, initialize shutdown timeout
56+ if (shutdownNanos == Long .MAX_VALUE ) {
57+ val now = timeSource.nanoTime()
58+ if (shutdownNanos == Long .MAX_VALUE ) shutdownNanos = now + KEEP_ALIVE_NANOS
59+ val tillShutdown = shutdownNanos - now
60+ if (tillShutdown <= 0 ) break @runLoop // shut thread down
61+ parkNanos = parkNanos.coerceAtMost(tillShutdown)
62+ } else
63+ parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS ) // limit wait time anyway
64+ }
65+ if (parkNanos > 0 ) {
66+ // check if shutdown was requested and bail out in this case
67+ if (debugStatus == SHUTDOWN_REQ ) {
68+ acknowledgeShutdown()
69+ break @runLoop
70+ } else {
71+ timeSource.parkNanos(this , parkNanos)
72+ }
7273 }
7374 }
7475 }
@@ -101,23 +102,26 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
101102 @Synchronized
102103 internal fun ensureStarted () {
103104 assert (_thread == null ) // ensure we are at a clean state
105+ assert (debugStatus == FRESH || debugStatus == SHUTDOWN_ACK )
104106 debugStatus = FRESH
105107 createThreadSync() // create fresh thread
106108 while (debugStatus == FRESH ) (this as Object ).wait()
107109 }
108110
109111 @Synchronized
110- private fun notifyStartup () {
112+ private fun notifyStartup (): Boolean {
113+ if (debugStatus == SHUTDOWN_REQ ) return false
111114 debugStatus = ACTIVE
112115 (this as Object ).notifyAll()
116+ return true
113117 }
114118
115119 // used for tests
116120 @Synchronized
117121 internal fun shutdown (timeout : Long ) {
118122 if (_thread != null ) {
119123 val deadline = System .currentTimeMillis() + timeout
120- if (debugStatus == ACTIVE ) debugStatus = SHUTDOWN_REQ
124+ if (debugStatus == ACTIVE || debugStatus == FRESH ) debugStatus = SHUTDOWN_REQ
121125 unpark()
122126 // loop while there is anything to do immediately or deadline passes
123127 while (debugStatus != SHUTDOWN_ACK && _thread != null ) {
0 commit comments