@@ -27,11 +27,12 @@ import java.util.concurrent.ConcurrentHashMap
2727import java.util.concurrent.TimeUnit
2828import java.util.concurrent.locks.LockSupport
2929
30+ private const val WAIT_LOST_THREADS = 10_000L // 10s
3031private val ignoreLostThreads = mutableSetOf<String >()
3132
3233fun ignoreLostThreads (vararg s : String ) { ignoreLostThreads + = s }
3334
34- fun threadNames (): Set <String > {
35+ fun currentThreads (): Set <Thread > {
3536 var estimate = 0
3637 while (true ) {
3738 estimate = estimate.coerceAtLeast(Thread .activeCount() + 1 )
@@ -41,33 +42,37 @@ fun threadNames(): Set<String> {
4142 estimate = n + 1
4243 continue // retry with a better size estimate
4344 }
44- val names = hashSetOf<String >()
45+ val threads = hashSetOf<Thread >()
4546 for (i in 0 until n)
46- names .add(sanitizeThreadName( arrayOfThreads[i]!! .name) )
47- return names
47+ threads .add(arrayOfThreads[i]!! )
48+ return threads
4849 }
4950}
5051
51- // remove coroutine names from thread in case we have lost threads with coroutines running in them
52- private fun sanitizeThreadName (name : String ): String {
53- val i = name.indexOf(" @" )
54- return if (i < 0 ) name else name.substring(0 , i)
55- }
56-
57- fun checkTestThreads (threadNamesBefore : Set <String >) {
52+ fun checkTestThreads (threadsBefore : Set <Thread >) {
5853 // give threads some time to shutdown
59- val waitTill = System .currentTimeMillis() + 1000L
60- var diff: List <String >
54+ val waitTill = System .currentTimeMillis() + WAIT_LOST_THREADS
55+ var diff: List <Thread >
6156 do {
62- val threadNamesAfter = threadNames ()
63- diff = (threadNamesAfter - threadNamesBefore ).filter { name ->
64- ignoreLostThreads.none { prefix -> name.startsWith(prefix) }
57+ val threadsAfter = currentThreads ()
58+ diff = (threadsAfter - threadsBefore ).filter { thread ->
59+ ignoreLostThreads.none { prefix -> thread. name.startsWith(prefix) }
6560 }
6661 if (diff.isEmpty()) break
6762 } while (System .currentTimeMillis() <= waitTill)
6863 ignoreLostThreads.clear()
69- diff.forEach { println (" Lost thread '$it '" ) }
70- check(diff.isEmpty()) { " Lost ${diff.size} threads" }
64+ if (diff.isEmpty()) return
65+ val message = " Lost threads ${diff.map { it.name }} "
66+ println (" !!! $message " )
67+ println (" === Dumping lost thread stack traces" )
68+ diff.forEach { thread ->
69+ println (" Thread \" ${thread.name} \" ${thread.state} " )
70+ val trace = thread.stackTrace
71+ for (t in trace) println (" \t at ${t.className} .${t.methodName} (${t.fileName} :${t.lineNumber} )" )
72+ println ()
73+ }
74+ println (" ===" )
75+ error(message)
7176}
7277
7378fun trackTask (block : Runnable ) = timeSource.trackTask(block)
@@ -96,7 +101,7 @@ fun test(name: String, block: () -> Unit): List<String> = outputException(name)
96101 resetCoroutineId()
97102 // shutdown execution with old time source (in case it was working)
98103 DefaultExecutor .shutdown(SHUTDOWN_TIMEOUT )
99- val threadNamesBefore = threadNames ()
104+ val threadsBefore = currentThreads ()
100105 val testTimeSource = TestTimeSource (oldOut)
101106 timeSource = testTimeSource
102107 DefaultExecutor .ensureStarted() // should start with new time source
@@ -121,7 +126,7 @@ fun test(name: String, block: () -> Unit): List<String> = outputException(name)
121126 oldOut.println (" --- done" )
122127 System .setOut(oldOut)
123128 System .setErr(oldErr)
124- checkTestThreads(threadNamesBefore )
129+ checkTestThreads(threadsBefore )
125130 }
126131 return ByteArrayInputStream (bytes).bufferedReader().readLines()
127132}
0 commit comments