@@ -29,19 +29,17 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
2929 public typealias Element = SequenceOutput . Buffer
3030
3131 private let diskIO : TrackedPlatformDiskIO
32- private let bufferSize : Int
3332 private var buffer : [ UInt8 ]
3433 private var currentPosition : Int
3534 private var finished : Bool
36- private var streamIterator : AsyncThrowingStream < StreamStatus , Swift . Error > . AsyncIterator
35+ private var streamIterator : AsyncThrowingStream < TrackedPlatformDiskIO . StreamStatus , Swift . Error > . AsyncIterator
3736
38- internal init ( diskIO: TrackedPlatformDiskIO , bufferSize : Int ) {
37+ internal init ( diskIO: TrackedPlatformDiskIO ) {
3938 self . diskIO = diskIO
40- self . bufferSize = bufferSize
4139 self . buffer = [ ]
4240 self . currentPosition = 0
4341 self . finished = false
44- self . streamIterator = Self . createDataStream ( with : diskIO . dispatchIO , bufferSize : bufferSize ) . makeAsyncIterator ( )
42+ self . streamIterator = diskIO . readDataStream ( upToLength : readBufferSize ) . makeAsyncIterator ( )
4543 }
4644
4745 public mutating func next( ) async throws -> SequenceOutput . Buffer ? {
@@ -51,7 +49,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
5149 return data
5250
5351 case . endOfStream( let data) :
54- streamIterator = Self . createDataStream ( with : diskIO . dispatchIO , bufferSize : bufferSize ) . makeAsyncIterator ( )
52+ streamIterator = diskIO . readDataStream ( upToLength : readBufferSize ) . makeAsyncIterator ( )
5553 return data
5654
5755 case . endOfFile:
@@ -63,66 +61,24 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
6361 return nil
6462 }
6563 }
66-
67- private enum StreamStatus {
68- case data( SequenceOutput . Buffer )
69- case endOfStream( SequenceOutput . Buffer )
70- case endOfFile
71- }
72-
73- private static func createDataStream( with dispatchIO: DispatchIO , bufferSize: Int ) -> AsyncThrowingStream < StreamStatus , Swift . Error > {
74- return AsyncThrowingStream < StreamStatus , Swift . Error > { continuation in
75- dispatchIO. read (
76- offset: 0 ,
77- length: bufferSize,
78- queue: . global( )
79- ) { done, data, error in
80- if error != 0 {
81- continuation. finish ( throwing: SubprocessError (
82- code: . init( . failedToReadFromSubprocess) ,
83- underlyingError: . init( rawValue: error)
84- ) )
85- return
86- }
87-
88- // Treat empty data and nil as the same
89- let buffer = data. map { $0. isEmpty ? nil : $0 } ?? nil
90- let status : StreamStatus
91-
92- switch ( buffer, done) {
93- case ( . some( let data) , false ) :
94- status = . data( SequenceOutput . Buffer ( data: data) )
95-
96- case ( . some( let data) , true ) :
97- status = . endOfStream( SequenceOutput . Buffer ( data: data) )
98-
99- case ( nil , false ) :
100- return
101-
102- case ( nil , true ) :
103- status = . endOfFile
104- }
105-
106- continuation. yield ( status)
107-
108- if done {
109- continuation. finish ( )
110- }
111- }
112- }
113- }
11464 }
11565
11666 private let diskIO : TrackedPlatformDiskIO
117- private let bufferSize : Int
11867
119- internal init ( diskIO: TrackedPlatformDiskIO , bufferSize : Int ) {
68+ internal init ( diskIO: TrackedPlatformDiskIO ) {
12069 self . diskIO = diskIO
121- self . bufferSize = bufferSize
12270 }
12371
12472 public func makeAsyncIterator( ) -> Iterator {
125- return Iterator ( diskIO: self . diskIO, bufferSize: bufferSize)
73+ return Iterator ( diskIO: self . diskIO)
74+ }
75+ }
76+
77+ extension TrackedPlatformDiskIO {
78+ internal enum StreamStatus {
79+ case data( SequenceOutput . Buffer )
80+ case endOfStream( SequenceOutput . Buffer )
81+ case endOfFile
12682 }
12783}
12884
0 commit comments