@@ -57,114 +57,38 @@ extension DispatchFD {
5757 }
5858 }
5959 }
60- }
6160
62- extension AsyncThrowingStream where Element == UInt8 , Failure == any Error {
6361 /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
6462 @available ( macOS, deprecated: 15.0 , message: " Use the AsyncSequence-returning overload. " )
6563 @available ( iOS, deprecated: 18.0 , message: " Use the AsyncSequence-returning overload. " )
6664 @available ( tvOS, deprecated: 18.0 , message: " Use the AsyncSequence-returning overload. " )
6765 @available ( watchOS, deprecated: 11.0 , message: " Use the AsyncSequence-returning overload. " )
6866 @available ( visionOS, deprecated: 2.0 , message: " Use the AsyncSequence-returning overload. " )
69- public static func _dataStream( reading fileDescriptor: DispatchFD , on queue: SWBQueue ) -> AsyncThrowingStream < Element , any Error > {
70- AsyncThrowingStream { continuation in
71- let newFD : DispatchFD
72- do {
73- newFD = try fileDescriptor. _duplicate ( )
74- } catch {
75- continuation. finish ( throwing: error)
76- return
77- }
78-
79- let io = SWBDispatchIO . stream ( fileDescriptor: newFD, queue: queue) { error in
80- do {
81- try newFD. _close ( )
82- if error != 0 {
83- continuation. finish ( throwing: POSIXError ( error, context: " dataStream(reading: \( fileDescriptor) )#1 " ) )
84- }
85- } catch {
86- continuation. finish ( throwing: error)
87- }
88- }
89- io. setLimit ( lowWater: 0 )
90- io. setLimit ( highWater: 4096 )
91-
92- continuation. onTermination = { termination in
93- if case . cancelled = termination {
94- io. close ( flags: . stop)
95- } else {
96- io. close ( )
97- }
98- }
99-
100- io. read ( offset: 0 , length: . max, queue: queue) { done, data, error in
101- guard error == 0 else {
102- continuation. finish ( throwing: POSIXError ( error, context: " dataStream(reading: \( fileDescriptor) )#2 " ) )
103- return
104- }
105-
106- let data = data ?? . empty
107- for element in data {
108- continuation. yield ( element)
109- }
110-
111- if done {
112- continuation. finish ( )
67+ public func _dataStream( ) -> AsyncThrowingStream < SWBDispatchData , any Error > {
68+ AsyncThrowingStream < SWBDispatchData , any Error > {
69+ while !Task. isCancelled {
70+ let chunk = try await readChunk ( upToLength: 4096 )
71+ if chunk. isEmpty {
72+ return nil
11373 }
74+ return chunk
11475 }
76+ throw CancellationError ( )
11577 }
11678 }
117- }
11879
119- @available ( macOS 15 . 0 , iOS 18 . 0 , tvOS 18 . 0 , watchOS 11 . 0 , visionOS 2 . 0 , * )
120- extension AsyncSequence where Element == UInt8 , Failure == any Error {
12180 /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
122- public static func dataStream( reading fileDescriptor: DispatchFD , on queue: SWBQueue ) -> any AsyncSequence < Element , any Error > {
123- AsyncThrowingStream < SWBDispatchData , any Error > { continuation in
124- let newFD : DispatchFD
125- do {
126- newFD = try fileDescriptor. _duplicate ( )
127- } catch {
128- continuation. finish ( throwing: error)
129- return
130- }
131-
132- let io = SWBDispatchIO . stream ( fileDescriptor: newFD, queue: queue) { error in
133- do {
134- try newFD. _close ( )
135- if error != 0 {
136- let context = " dataStream(reading: \( fileDescriptor) \" \( Result { try fileDescriptor. _filePath ( ) } ) \" )#1 "
137- continuation. finish ( throwing: POSIXError ( error, context: context) )
138- }
139- } catch {
140- continuation. finish ( throwing: error)
141- }
142- }
143- io. setLimit ( lowWater: 0 )
144- io. setLimit ( highWater: 4096 )
145-
146- continuation. onTermination = { termination in
147- if case . cancelled = termination {
148- io. close ( flags: . stop)
149- } else {
150- io. close ( )
151- }
152- }
153-
154- io. read ( offset: 0 , length: . max, queue: queue) { done, data, error in
155- guard error == 0 else {
156- let context = " dataStream(reading: \( fileDescriptor) \" \( Result { try fileDescriptor. _filePath ( ) } ) \" )#2 "
157- continuation. finish ( throwing: POSIXError ( error, context: context) )
158- return
159- }
160-
161- let data = data ?? . empty
162- continuation. yield ( data)
163-
164- if done {
165- continuation. finish ( )
81+ @available ( macOS 15 . 0 , iOS 18 . 0 , tvOS 18 . 0 , watchOS 11 . 0 , visionOS 2 . 0 , * )
82+ public func dataStream( ) -> some AsyncSequence < SWBDispatchData , any Error > {
83+ AsyncThrowingStream < SWBDispatchData , any Error > {
84+ while !Task. isCancelled {
85+ let chunk = try await readChunk ( upToLength: 4096 )
86+ if chunk. isEmpty {
87+ return nil
16688 }
89+ return chunk
16790 }
168- } . flattened
91+ throw CancellationError ( )
92+ }
16993 }
17094}
0 commit comments