99//
1010//===----------------------------------------------------------------------===//
1111
12- #if compiler(>=6.2)
13-
1412import Synchronization
1513import DequeModule
1614
17- @available ( AsyncAlgorithms 1 . 1 , * )
15+ @available ( AsyncAlgorithms 1 . 0 , * )
1816extension AsyncSequence
19- where Element: Sendable , Self: SendableMetatype , AsyncIterator: SendableMetatype {
17+ where Element: Sendable , Self: _SendableMetatype , AsyncIterator: _SendableMetatype {
2018 /// Creates a shared async sequence that allows multiple concurrent iterations over a single source.
2119 ///
2220 /// The `share` method transforms an async sequence into a shareable sequence that can be safely
@@ -67,7 +65,7 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype
6765 ///
6866 public func share(
6967 bufferingPolicy: AsyncBufferSequencePolicy = . bounded( 1 )
70- ) -> some AsyncSequence < Element , Failure > & Sendable {
68+ ) -> AsyncShareSequence < Self > {
7169 // The iterator is transferred to the isolation of the iterating task
7270 // this has to be done "unsafely" since we cannot annotate the transfer
7371 // however since iterating an AsyncSequence types twice has been defined
@@ -114,9 +112,9 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype
114112//
115113// This type is typically not used directly; instead, use the `share()` method on any
116114// async sequence that meets the sendability requirements.
117- @available ( AsyncAlgorithms 1 . 1 , * )
118- struct AsyncShareSequence < Base: AsyncSequence > : Sendable
119- where Base. Element: Sendable , Base: SendableMetatype , Base. AsyncIterator: SendableMetatype {
115+ @available ( AsyncAlgorithms 1 . 0 , * )
116+ public struct AsyncShareSequence < Base: AsyncSequence > : Sendable
117+ where Base. Element: Sendable , Base: _SendableMetatype , Base. AsyncIterator: _SendableMetatype {
120118 // Represents a single consumer's connection to the shared sequence.
121119 //
122120 // Each iterator of the shared sequence creates its own `Side` instance, which tracks
@@ -135,7 +133,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
135133 // - `continuation`: The continuation waiting for the next element (nil if not waiting)
136134 // - `position`: The consumer's current position in the shared buffer
137135 struct State {
138- var continuation : UnsafeContinuation < Result < Element ? , Failure > , Never > ?
136+ var continuation : UnsafeContinuation < Result < Base . Element ? , Error > , Never > ?
139137 var position = 0
140138
141139 // Creates a new state with the position adjusted by the given offset.
@@ -162,7 +160,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
162160 iteration. unregisterSide ( id)
163161 }
164162
165- func next( isolation actor : isolated ( any Actor ) ? ) async throws ( Failure ) -> Element ? {
163+ func next( isolation actor : isolated ( any Actor ) ? ) async throws -> Base . Element ? {
166164 try await iteration. next ( isolation: actor , id: id)
167165 }
168166 }
@@ -230,9 +228,9 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
230228 var generation = 0
231229 var sides = [ Int: Side . State] ( )
232230 var iteratingTask : IteratingTask
233- private( set) var buffer = Deque < Element > ( )
231+ private( set) var buffer = Deque < Base . Element > ( )
234232 private( set) var finished = false
235- private( set) var failure : Failure ?
233+ private( set) var failure : Error ?
236234 var cancelled = false
237235 var limit : UnsafeContinuation < Bool , Never > ?
238236 var demand : UnsafeContinuation < Void , Never > ?
@@ -311,7 +309,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
311309 // **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends
312310 //
313311 // - Parameter element: The element to add to the buffer
314- mutating func enqueue( _ element: Element ) {
312+ mutating func enqueue( _ element: Base . Element ) {
315313 let count = buffer. count
316314
317315 switch storagePolicy {
@@ -335,20 +333,20 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
335333 finished = true
336334 }
337335
338- mutating func fail( _ error: Failure ) {
336+ mutating func fail( _ error: Error ) {
339337 finished = true
340338 failure = error
341339 }
342340 }
343341
344- let state : Mutex < State >
342+ let state : ManagedCriticalState < State >
345343 let limit : Int ?
346344
347345 init (
348346 _ iteratorFactory: @escaping @Sendable ( ) -> sending Base. AsyncIterator ,
349347 bufferingPolicy: AsyncBufferSequencePolicy
350348 ) {
351- state = Mutex ( State ( iteratorFactory, bufferingPolicy: bufferingPolicy) )
349+ state = ManagedCriticalState ( State ( iteratorFactory, bufferingPolicy: bufferingPolicy) )
352350 switch bufferingPolicy. policy {
353351 case . bounded( let limit) :
354352 self . limit = limit
@@ -478,15 +476,15 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
478476 }
479477
480478 struct Resumption {
481- let continuation : UnsafeContinuation < Result < Element ? , Failure > , Never >
482- let result : Result < Element ? , Failure >
479+ let continuation : UnsafeContinuation < Result < Base . Element ? , Error > , Never >
480+ let result : Result < Base . Element ? , Error >
483481
484482 func resume( ) {
485483 continuation. resume ( returning: result)
486484 }
487485 }
488486
489- func emit( _ result: Result < Element ? , Failure > ) {
487+ func emit( _ result: Result < Base . Element ? , Error > ) {
490488 let ( resumptions, limitContinuation, demandContinuation, cancelled) = state. withLock {
491489 state -> ( [ Resumption ] , UnsafeContinuation < Bool , Never > ? , UnsafeContinuation < Void , Never > ? , Bool ) in
492490 var resumptions = [ Resumption] ( )
@@ -533,12 +531,12 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
533531
534532 private func nextIteration(
535533 _ id: Int
536- ) async -> Result < AsyncShareSequence < Base > . Element ? , AsyncShareSequence < Base > . Failure > {
534+ ) async -> Result < Base . Element ? , Error > {
537535 return await withTaskCancellationHandler {
538536 await withUnsafeContinuation { continuation in
539537 let ( res, limitContinuation, demandContinuation, cancelled) = state. withLock {
540538 state -> (
541- Result < Element ? , Failure > ? , UnsafeContinuation < Bool , Never > ? , UnsafeContinuation < Void , Never > ? , Bool
539+ Result < Base . Element ? , Error > ? , UnsafeContinuation < Bool , Never > ? , UnsafeContinuation < Void , Never > ? , Bool
542540 ) in
543541 guard let side = state. sides [ id] else {
544542 return state. emit ( . success( nil ) , limit: limit)
@@ -587,11 +585,11 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
587585 }
588586 }
589587 } catch {
590- emit ( . failure( error as! Failure ) )
588+ emit ( . failure( error) )
591589 }
592590 }
593591
594- func next( isolation actor : isolated ( any Actor ) ? , id: Int ) async throws ( Failure ) -> Element ? {
592+ func next( isolation actor : isolated ( any Actor ) ? , id: Int ) async throws -> Base . Element ? {
595593 let ( factory, cancelled) = state. withLock { state -> ( ( @Sendable ( ) -> sending Base. AsyncIterator ) ? , Bool ) in
596594 switch state. iteratingTask {
597595 case . pending( let factory) :
@@ -697,30 +695,29 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
697695 }
698696}
699697
700- @available ( AsyncAlgorithms 1 . 1 , * )
698+ @available ( AsyncAlgorithms 1 . 0 , * )
701699extension AsyncShareSequence : AsyncSequence {
702- typealias Element = Base . Element
703- typealias Failure = Base . Failure
700+ public typealias Element = Base . Element
701+ public typealias Failure = Swift . Error
704702
705- struct Iterator : AsyncIteratorProtocol {
703+ public struct Iterator : AsyncIteratorProtocol {
706704 let side : Side
707705
708706 init ( _ iteration: Iteration ) {
709707 side = Side ( iteration)
710708 }
711709
712- mutating func next( ) async rethrows -> Element ? {
710+ mutating public func next( ) async rethrows -> Element ? {
713711 try await side. next ( isolation: nil )
714712 }
715713
716- mutating func next( isolation actor : isolated ( any Actor ) ? ) async throws ( Failure) -> Element ? {
714+ mutating public func next( isolation actor : isolated ( any Actor ) ? ) async throws ( Failure) -> Element ? {
717715 try await side. next ( isolation: actor )
718716 }
719717 }
720718
721- func makeAsyncIterator( ) -> Iterator {
719+ public func makeAsyncIterator( ) -> Iterator {
722720 Iterator ( extent. iteration)
723721 }
724722}
725723
726- #endif
0 commit comments