@@ -514,15 +514,18 @@ package final class AsyncProcess {
514514 if self . outputRedirection. redirectsOutput {
515515 let stdoutPipe = Pipe ( )
516516 let stderrPipe = Pipe ( )
517+ let stdoutStream = DispatchFD ( fileHandle: stdoutPipe. fileHandleForReading) . dataStream ( )
518+ let stderrStream = DispatchFD ( fileHandle: stderrPipe. fileHandleForReading) . dataStream ( )
517519
518520 group. enter ( )
519- stdoutPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
520- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
521- if data. count == 0 {
522- stdoutPipe. fileHandleForReading. readabilityHandler = nil
521+ Task {
522+ defer {
523+ print ( " --- finished consuming stdout --- " )
523524 group. leave ( )
524- } else {
525- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
525+ }
526+ print ( " --- started consuming stdout --- " )
527+ for try await data in stdoutStream {
528+ let contents = [ UInt8] ( data)
526529 self . outputRedirection. outputClosures? . stdoutClosure ( contents)
527530 stdoutLock. withLock {
528531 stdout += contents
@@ -531,13 +534,14 @@ package final class AsyncProcess {
531534 }
532535
533536 group. enter ( )
534- stderrPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
535- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
536- if data. count == 0 {
537- stderrPipe. fileHandleForReading. readabilityHandler = nil
537+ Task {
538+ defer {
539+ print ( " --- finished consuming stderr --- " )
538540 group. leave ( )
539- } else {
540- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
541+ }
542+ print ( " --- started consuming stderr --- " )
543+ for try await data in stderrStream {
544+ let contents = [ UInt8] ( data)
541545 self . outputRedirection. outputClosures? . stderrClosure ( contents)
542546 stderrLock. withLock {
543547 stderr += contents
@@ -557,6 +561,7 @@ package final class AsyncProcess {
557561 }
558562
559563 group. notify ( queue: self . completionQueue) {
564+ print ( " --- notified that output is ready --- " )
560565 self . stateLock. withLock {
561566 self . state = . outputReady( stdout: . success( stdout) , stderr: . success( stderr) )
562567 }
@@ -820,6 +825,7 @@ package final class AsyncProcess {
820825 /// Executes the process I/O state machine, calling completion block when finished.
821826 private func waitUntilExit( _ completion: @escaping ( Result < AsyncProcessResult , Swift . Error > ) -> Void ) {
822827 self . stateLock. lock ( )
828+ print ( " --- waitUntilExit called: \( self . state) --- " )
823829 switch self . state {
824830 case . idle:
825831 defer { self . stateLock. unlock ( ) }
@@ -832,7 +838,9 @@ package final class AsyncProcess {
832838 completion ( . failure( error) )
833839 case . readingOutput( let sync) :
834840 self . stateLock. unlock ( )
841+ print ( " --- queing up waitUntilExit block --- " )
835842 sync. notify ( queue: self . completionQueue) {
843+ print ( " --- was notified we should enter waitUntilExit again --- " )
836844 self . waitUntilExit ( completion)
837845 }
838846 case . outputReady( let stdoutResult, let stderrResult) :
@@ -1354,3 +1362,51 @@ extension FileHandle: WritableByteStream {
13541362 }
13551363}
13561364#endif
1365+
1366+ extension DispatchFD {
1367+ public func readChunk( upToLength maxLength: Int ) async throws -> DispatchData {
1368+ return try await withCheckedThrowingContinuation { continuation in
1369+ DispatchIO . read ( fromFileDescriptor: numericCast ( self . rawValue) , maxLength: maxLength, runningHandlerOn: DispatchQueue . global ( ) )
1370+ { data, error in
1371+ if error != 0 {
1372+ continuation. resume ( throwing: StringError ( " POSIX error: \( error) " ) )
1373+ return
1374+ }
1375+ continuation. resume ( returning: data)
1376+ }
1377+ }
1378+
1379+ }
1380+
1381+ /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
1382+ @available ( macOS 15 . 0 , iOS 18 . 0 , tvOS 18 . 0 , watchOS 11 . 0 , visionOS 2 . 0 , * )
1383+ public func dataStream( ) -> some AsyncSequence < DispatchData , any Error > {
1384+ AsyncThrowingStream < DispatchData , any Error > {
1385+ while !Task. isCancelled {
1386+ let chunk = try await readChunk ( upToLength: 4096 )
1387+ if chunk. isEmpty {
1388+ return nil
1389+ }
1390+ return chunk
1391+ }
1392+ throw CancellationError ( )
1393+ }
1394+ }
1395+ }
1396+
1397+ public struct DispatchFD {
1398+ #if os(Windows)
1399+ fileprivate let rawValue : Int
1400+ #else
1401+ fileprivate let rawValue : Int32
1402+ #endif
1403+
1404+ init ( fileHandle: FileHandle ) {
1405+ #if os(Windows)
1406+ // This may look unsafe, but is how swift-corelibs-dispatch works. Basically, dispatch_fd_t directly represents either a POSIX file descriptor OR a Windows HANDLE pointer address, meaning that the fileDescriptor parameter of various Dispatch APIs is actually NOT a file descriptor on Windows but rather a HANDLE. This means that the handle should NOT be converted using _open_osfhandle, and the return value of this function should ONLY be passed to Dispatch functions where the fileDescriptor parameter is masquerading as a HANDLE in this manner. Use with extreme caution.
1407+ rawValue = . init( bitPattern: fileHandle. _handle)
1408+ #else
1409+ rawValue = fileHandle. fileDescriptor
1410+ #endif
1411+ }
1412+ }
0 commit comments