@@ -17,7 +17,7 @@ import Swift
1717import Darwin
1818
1919func _lockWordCount( ) -> Int {
20- let sz =
20+ let sz =
2121 MemoryLayout < os_unfair_lock > . size / MemoryLayout < UnsafeRawPointer > . size
2222 return max ( sz, 1 )
2323}
@@ -57,7 +57,7 @@ extension AsyncStream {
5757 typealias TerminationHandler = @Sendable ( Continuation . Termination ) -> Void
5858
5959 struct State {
60- var continuation : UnsafeContinuation < Element ? , Never > ?
60+ var continuations = [ UnsafeContinuation < Element ? , Never > ] ( )
6161 var pending = _Deque < Element > ( )
6262 let limit : Continuation . BufferingPolicy
6363 var onTermination : TerminationHandler ?
@@ -105,7 +105,7 @@ extension AsyncStream {
105105 }
106106 }
107107
108- func cancel( ) {
108+ @ Sendable func cancel( ) {
109109 lock ( )
110110 // swap out the handler before we invoke it to prevent double cancel
111111 let handler = state. onTermination
@@ -123,7 +123,9 @@ extension AsyncStream {
123123 lock ( )
124124 let limit = state. limit
125125 let count = state. pending. count
126- if let continuation = state. continuation {
126+
127+ if !state. continuations. isEmpty {
128+ let continuation = state. continuations. removeFirst ( )
127129 if count > 0 {
128130 if !state. terminal {
129131 switch limit {
@@ -151,17 +153,14 @@ extension AsyncStream {
151153 } else {
152154 result = . terminated
153155 }
154- state. continuation = nil
155156 let toSend = state. pending. removeFirst ( )
156157 unlock ( )
157158 continuation. resume ( returning: toSend)
158159 } else if state. terminal {
159- state. continuation = nil
160160 result = . terminated
161161 unlock ( )
162162 continuation. resume ( returning: nil )
163163 } else {
164- state. continuation = nil
165164 switch limit {
166165 case . unbounded:
167166 result = . enqueued( remaining: . max)
@@ -212,15 +211,15 @@ extension AsyncStream {
212211 state. onTermination = nil
213212 state. terminal = true
214213
215- if let continuation = state. continuation {
214+ if let continuation = state. continuations . first {
216215 if state. pending. count > 0 {
217- state. continuation = nil
216+ state. continuations . removeFirst ( )
218217 let toSend = state. pending. removeFirst ( )
219218 unlock ( )
220219 handler ? ( . finished)
221220 continuation. resume ( returning: toSend)
222221 } else if state. terminal {
223- state. continuation = nil
222+ state. continuations . removeFirst ( )
224223 unlock ( )
225224 handler ? ( . finished)
226225 continuation. resume ( returning: nil )
@@ -236,22 +235,20 @@ extension AsyncStream {
236235
237236 func next( _ continuation: UnsafeContinuation < Element ? , Never > ) {
238237 lock ( )
239- if state. continuation == nil {
240- if state. pending. count > 0 {
241- let toSend = state. pending. removeFirst ( )
242- unlock ( )
243- continuation. resume ( returning: toSend)
244- } else if state. terminal {
245- unlock ( )
246- continuation. resume ( returning: nil )
247- } else {
248- state. continuation = continuation
249- unlock ( )
250- }
238+ state. continuations. append ( continuation)
239+ if state. pending. count > 0 {
240+ let cont = state. continuations. removeFirst ( )
241+ let toSend = state. pending. removeFirst ( )
242+ unlock ( )
243+ cont. resume ( returning: toSend)
244+ } else if state. terminal {
245+ let cont = state. continuations. removeFirst ( )
246+ unlock ( )
247+ cont. resume ( returning: nil )
251248 } else {
252249 unlock ( )
253- fatalError ( " attempt to await next() on more than one task " )
254250 }
251+
255252 }
256253
257254 func next( ) async -> Element ? {
@@ -341,7 +338,7 @@ extension AsyncThrowingStream {
341338 }
342339 }
343340
344- func cancel( ) {
341+ @ Sendable func cancel( ) {
345342 lock ( )
346343 // swap out the handler before we invoke it to prevent double cancel
347344 let handler = state. onTermination
@@ -595,3 +592,4 @@ final class _AsyncStreamCriticalStorage<Contents>: @unchecked Sendable {
595592 return storage
596593 }
597594}
595+
0 commit comments