Skip to content

Commit 95b7a13

Browse files
authored
Flows returned by stateIn/shareIn keep strong reference to sharing job (#2701)
* Flows returned by stateIn/shareIn keep strong reference to sharing job Fixes #2557 * ~ Review fixes: fewer classes, suppress unused warning
1 parent 7053405 commit 95b7a13

File tree

2 files changed

+58
-11
lines changed

2 files changed

+58
-11
lines changed

kotlinx-coroutines-core/common/src/flow/operators/Share.kt

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ public fun <T> Flow<T>.shareIn(
144144
onBufferOverflow = config.onBufferOverflow
145145
)
146146
@Suppress("UNCHECKED_CAST")
147-
scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
148-
return shared.asSharedFlow()
147+
val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
148+
return ReadonlySharedFlow(shared, job)
149149
}
150150

151151
private class SharingConfig<T>(
@@ -197,7 +197,7 @@ private fun <T> CoroutineScope.launchSharing(
197197
shared: MutableSharedFlow<T>,
198198
started: SharingStarted,
199199
initialValue: T
200-
) {
200+
): Job =
201201
launch(context) { // the single coroutine to rule the sharing
202202
// Optimize common built-in started strategies
203203
when {
@@ -230,7 +230,6 @@ private fun <T> CoroutineScope.launchSharing(
230230
}
231231
}
232232
}
233-
}
234233

235234
// -------------------------------- stateIn --------------------------------
236235

@@ -303,8 +302,8 @@ public fun <T> Flow<T>.stateIn(
303302
): StateFlow<T> {
304303
val config = configureSharing(1)
305304
val state = MutableStateFlow(initialValue)
306-
scope.launchSharing(config.context, config.upstream, state, started, initialValue)
307-
return state.asStateFlow()
305+
val job = scope.launchSharing(config.context, config.upstream, state, started, initialValue)
306+
return ReadonlyStateFlow(state, job)
308307
}
309308

310309
/**
@@ -332,7 +331,7 @@ private fun <T> CoroutineScope.launchSharingDeferred(
332331
upstream.collect { value ->
333332
state?.let { it.value = value } ?: run {
334333
state = MutableStateFlow(value).also {
335-
result.complete(it.asStateFlow())
334+
result.complete(ReadonlyStateFlow(it, coroutineContext.job))
336335
}
337336
}
338337
}
@@ -351,23 +350,27 @@ private fun <T> CoroutineScope.launchSharingDeferred(
351350
* Represents this mutable shared flow as a read-only shared flow.
352351
*/
353352
public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> =
354-
ReadonlySharedFlow(this)
353+
ReadonlySharedFlow(this, null)
355354

356355
/**
357356
* Represents this mutable state flow as a read-only state flow.
358357
*/
359358
public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
360-
ReadonlyStateFlow(this)
359+
ReadonlyStateFlow(this, null)
361360

362361
private class ReadonlySharedFlow<T>(
363-
flow: SharedFlow<T>
362+
flow: SharedFlow<T>,
363+
@Suppress("unused")
364+
private val job: Job? // keeps a strong reference to the job (if present)
364365
) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
365366
override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
366367
fuseSharedFlow(context, capacity, onBufferOverflow)
367368
}
368369

369370
private class ReadonlyStateFlow<T>(
370-
flow: StateFlow<T>
371+
flow: StateFlow<T>,
372+
@Suppress("unused")
373+
private val job: Job? // keeps a strong reference to the job (if present)
371374
) : StateFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
372375
override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
373376
fuseStateFlow(context, capacity, onBufferOverflow)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import org.junit.*
9+
10+
/**
11+
* Tests that shared flows keep strong reference to their source flows.
12+
* See https://github.com/Kotlin/kotlinx.coroutines/issues/2557
13+
*/
14+
@OptIn(DelicateCoroutinesApi::class)
15+
class SharingReferenceTest : TestBase() {
16+
private val token = object {}
17+
18+
private val weakEmitter = flow {
19+
emit(null)
20+
// suspend forever without keeping a strong reference to continuation -- this is a model of
21+
// a callback API that does not keep a strong reference it is listeners, but works
22+
suspendCancellableCoroutine<Unit> { }
23+
// using the token here to make it easily traceable
24+
emit(token)
25+
}
26+
27+
@Test
28+
fun testShareInReference() {
29+
val flow = weakEmitter.shareIn(GlobalScope, SharingStarted.Eagerly, 0)
30+
FieldWalker.assertReachableCount(1, flow) { it === token }
31+
}
32+
33+
@Test
34+
fun testStateInReference() {
35+
val flow = weakEmitter.stateIn(GlobalScope, SharingStarted.Eagerly, null)
36+
FieldWalker.assertReachableCount(1, flow) { it === token }
37+
}
38+
39+
@Test
40+
fun testStateInSuspendingReference() = runTest {
41+
val flow = weakEmitter.stateIn(GlobalScope)
42+
FieldWalker.assertReachableCount(1, flow) { it === token }
43+
}
44+
}

0 commit comments

Comments
 (0)