File tree Expand file tree Collapse file tree 1 file changed +4
-4
lines changed
kotlinx-coroutines-core/common/src/flow/operators Expand file tree Collapse file tree 1 file changed +4
-4
lines changed Original file line number Diff line number Diff line change @@ -13,6 +13,7 @@ import kotlinx.coroutines.channels.*
1313import kotlinx.coroutines.channels.Channel.Factory.OPTIONAL_CHANNEL
1414import kotlinx.coroutines.flow.internal.*
1515import kotlinx.coroutines.internal.*
16+ import kotlinx.coroutines.sync.*
1617import kotlin.coroutines.*
1718import kotlin.jvm.*
1819import kotlinx.coroutines.flow.unsafeFlow as flow
@@ -149,16 +150,15 @@ private class ChannelFlowMerge<T>(
149150
150151 // The actual merge implementation with concurrency limit
151152 private suspend fun mergeImpl (scope : CoroutineScope , collector : ConcurrentFlowCollector <T >) {
152- val semaphore = Channel < Unit > (concurrency)
153+ val semaphore = Semaphore (concurrency)
153154 @Suppress(" UNCHECKED_CAST" )
154155 flow.collect { inner ->
155- // TODO real semaphore (#94)
156- semaphore.send(Unit ) // Acquire concurrency permit
156+ semaphore.acquire() // Acquire concurrency permit
157157 scope.launch {
158158 try {
159159 inner.collect(collector)
160160 } finally {
161- semaphore.receive () // Release concurrency permit
161+ semaphore.release () // Release concurrency permit
162162 }
163163 }
164164 }
You can’t perform that action at this time.
0 commit comments