@@ -514,15 +514,16 @@ package final class AsyncProcess {
514514 if self . outputRedirection. redirectsOutput {
515515 let stdoutPipe = Pipe ( )
516516 let stderrPipe = Pipe ( )
517+ let stdoutStream = DispatchFD ( fileHandle: stdoutPipe. fileHandleForReading) . dataStream ( )
518+ let stderrStream = DispatchFD ( fileHandle: stderrPipe. fileHandleForReading) . dataStream ( )
517519
518520 group. enter ( )
519- stdoutPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
520- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
521- if data. count == 0 {
522- stdoutPipe. fileHandleForReading. readabilityHandler = nil
521+ Task {
522+ defer {
523523 group. leave ( )
524- } else {
525- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
524+ }
525+ for try await data in stdoutStream {
526+ let contents = [ UInt8] ( data)
526527 self . outputRedirection. outputClosures? . stdoutClosure ( contents)
527528 stdoutLock. withLock {
528529 stdout += contents
@@ -531,16 +532,15 @@ package final class AsyncProcess {
531532 }
532533
533534 group. enter ( )
534- stderrPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
535- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
536- if data. count == 0 {
537- stderrPipe. fileHandleForReading. readabilityHandler = nil
535+ Task {
536+ defer {
538537 group. leave ( )
539- } else {
540- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
541- self . outputRedirection. outputClosures? . stderrClosure ( contents)
542- stderrLock. withLock {
543- stderr += contents
538+ }
539+ for try await data in stderrStream {
540+ let contents = [ UInt8] ( data)
541+ self . outputRedirection. outputClosures? . stdoutClosure ( contents)
542+ stdoutLock. withLock {
543+ stdout += contents
544544 }
545545 }
546546 }
@@ -1354,3 +1354,51 @@ extension FileHandle: WritableByteStream {
13541354 }
13551355}
13561356#endif
1357+
1358+ extension DispatchFD {
1359+ public func readChunk( upToLength maxLength: Int ) async throws -> DispatchData {
1360+ return try await withCheckedThrowingContinuation { continuation in
1361+ DispatchIO . read ( fromFileDescriptor: numericCast ( self . rawValue) , maxLength: maxLength, runningHandlerOn: DispatchQueue . global ( ) )
1362+ { data, error in
1363+ if error != 0 {
1364+ continuation. resume ( throwing: StringError ( " POSIX error: \( error) " ) )
1365+ return
1366+ }
1367+ continuation. resume ( returning: data)
1368+ }
1369+ }
1370+
1371+ }
1372+
1373+ /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
1374+ @available ( macOS 15 . 0 , iOS 18 . 0 , tvOS 18 . 0 , watchOS 11 . 0 , visionOS 2 . 0 , * )
1375+ public func dataStream( ) -> some AsyncSequence < DispatchData , any Error > {
1376+ AsyncThrowingStream < DispatchData , any Error > {
1377+ while !Task. isCancelled {
1378+ let chunk = try await readChunk ( upToLength: 4096 )
1379+ if chunk. isEmpty {
1380+ return nil
1381+ }
1382+ return chunk
1383+ }
1384+ throw CancellationError ( )
1385+ }
1386+ }
1387+ }
1388+
1389+ public struct DispatchFD {
1390+ #if os(Windows)
1391+ fileprivate let rawValue : Int
1392+ #else
1393+ fileprivate let rawValue : Int32
1394+ #endif
1395+
1396+ init ( fileHandle: FileHandle ) {
1397+ #if os(Windows)
1398+ // This may look unsafe, but is how swift-corelibs-dispatch works. Basically, dispatch_fd_t directly represents either a POSIX file descriptor OR a Windows HANDLE pointer address, meaning that the fileDescriptor parameter of various Dispatch APIs is actually NOT a file descriptor on Windows but rather a HANDLE. This means that the handle should NOT be converted using _open_osfhandle, and the return value of this function should ONLY be passed to Dispatch functions where the fileDescriptor parameter is masquerading as a HANDLE in this manner. Use with extreme caution.
1399+ rawValue = . init( bitPattern: fileHandle. _handle)
1400+ #else
1401+ rawValue = fileHandle. fileDescriptor
1402+ #endif
1403+ }
1404+ }
0 commit comments