55package kotlinx.coroutines.flow
66
77import kotlinx.coroutines.*
8+ import kotlinx.coroutines.internal.*
89import org.junit.*
910
1011/* *
@@ -15,6 +16,15 @@ import org.junit.*
1516class SharingReferenceTest : TestBase () {
1617 private val token = object {}
1718
19+ /*
20+ * Single-threaded executor that we are using to ensure that the flow being sharing actually
21+ * suspended (spilled its locals, attached to parent), so we can verify reachability.
22+ * Without that, it's possible to have a situation where target flow is still
23+ * being strongly referenced (by its dispatcher), but the test already tries to test reachability and fails.
24+ */
25+ @get:Rule
26+ val executor = ExecutorRule (1 )
27+
1828 private val weakEmitter = flow {
1929 emit(null )
2030 // suspend forever without keeping a strong reference to continuation -- this is a model of
@@ -26,19 +36,26 @@ class SharingReferenceTest : TestBase() {
2636
2737 @Test
2838 fun testShareInReference () {
29- val flow = weakEmitter.shareIn(GlobalScope , SharingStarted .Eagerly , 0 )
39+ val flow = weakEmitter.shareIn(ContextScope (executor), SharingStarted .Eagerly , 0 )
40+ linearize()
3041 FieldWalker .assertReachableCount(1 , flow) { it == = token }
3142 }
3243
3344 @Test
3445 fun testStateInReference () {
35- val flow = weakEmitter.stateIn(GlobalScope , SharingStarted .Eagerly , null )
46+ val flow = weakEmitter.stateIn(ContextScope (executor), SharingStarted .Eagerly , null )
47+ linearize()
3648 FieldWalker .assertReachableCount(1 , flow) { it == = token }
3749 }
3850
3951 @Test
4052 fun testStateInSuspendingReference () = runTest {
4153 val flow = weakEmitter.stateIn(GlobalScope )
54+ linearize()
4255 FieldWalker .assertReachableCount(1 , flow) { it == = token }
4356 }
44- }
57+
58+ private fun linearize () {
59+ runBlocking(executor) { }
60+ }
61+ }
0 commit comments