@@ -37,6 +37,7 @@ extension Configuration {
3737 outputPipe: consuming CreatedPipe ,
3838 errorPipe: consuming CreatedPipe
3939 ) throws -> SpawnResult {
40+ // Ensure the waiter thread is running.
4041 _setupMonitorSignalHandler ( )
4142
4243 // Instead of checking if every possible executable path
@@ -266,32 +267,53 @@ extension String {
266267internal func monitorProcessTermination(
267268 forProcessWithIdentifier pid: ProcessIdentifier
268269) async throws -> TerminationStatus {
269- return try await withCheckedThrowingContinuation { continuation in
270+ try await withCheckedThrowingContinuation { continuation in
270271 _childProcessContinuations. withLock { continuations in
271- if let existing = continuations. removeValue ( forKey: pid. value) ,
272- case . status( let existingStatus) = existing
273- {
274- // We already have existing status to report
275- continuation. resume ( returning: existingStatus)
276- } else {
277- // Save the continuation for handler
278- continuations [ pid. value] = . continuation( continuation)
279- }
272+ // We don't need to worry about a race condition here because waitid()
273+ // does not clear the wait/zombie state of the child process. If it sees
274+ // the child process has terminated and manages to acquire the lock before
275+ // we add this continuation to the dictionary, then it will simply loop
276+ // and report the status again.
277+ let oldContinuation = continuations. updateValue ( continuation, forKey: pid. value)
278+ precondition ( oldContinuation == nil )
279+
280+ // Wake up the waiter thread if it is waiting for more child processes.
281+ _ = pthread_cond_signal ( _waitThreadNoChildrenCondition)
280282 }
281283 }
282284}
283285
284- private enum ContinuationOrStatus {
285- case continuation( CheckedContinuation < TerminationStatus , any Error > )
286- case status( TerminationStatus )
286+ // Small helper to provide thread-safe access to the child process to continuations map as well as a condition variable to suspend the calling thread when there are no subprocesses to wait for. Note that Mutex cannot be used here because we need the semantics of pthread_cond_wait, which requires passing the pthread_mutex_t instance as a parameter, something the Mutex API does not provide access to.
287+ private final class ChildProcessContinuations : Sendable {
288+ private nonisolated ( unsafe) var continuations = [ pid_t : CheckedContinuation < TerminationStatus , any Error > ] ( )
289+ private nonisolated ( unsafe) let mutex = UnsafeMutablePointer< pthread_mutex_t> . allocate( capacity: 1 )
290+
291+ init ( ) {
292+ pthread_mutex_init ( mutex, nil )
293+ }
294+
295+ func withLock< R> ( _ body: ( inout [ pid_t : CheckedContinuation < TerminationStatus , any Error > ] ) throws -> R ) rethrows -> R {
296+ try withUnsafeUnderlyingLock { _, continuations in
297+ try body ( & continuations)
298+ }
299+ }
300+
301+ func withUnsafeUnderlyingLock< R> ( _ body: ( UnsafeMutablePointer < pthread_mutex_t > , inout [ pid_t : CheckedContinuation < TerminationStatus , any Error > ] ) throws -> R ) rethrows -> R {
302+ pthread_mutex_lock ( mutex)
303+ defer {
304+ pthread_mutex_unlock ( mutex)
305+ }
306+ return try body ( mutex, & continuations)
307+ }
287308}
288309
289- private let _childProcessContinuations :
290- Mutex <
291- [ pid_t : ContinuationOrStatus ]
292- > = Mutex ( [ : ] )
310+ private let _childProcessContinuations = ChildProcessContinuations ( )
293311
294- private let signalSource : SendableSourceSignal = SendableSourceSignal ( )
312+ private nonisolated ( unsafe) let _waitThreadNoChildrenCondition = {
313+ let result = UnsafeMutablePointer< pthread_cond_t> . allocate( capacity: 1 )
314+ _ = pthread_cond_init ( result, nil )
315+ return result
316+ } ( )
295317
296318private extension siginfo_t {
297319 var si_status : Int32 {
@@ -316,64 +338,70 @@ private extension siginfo_t {
316338}
317339
318340private let setup : ( ) = {
319- signalSource. setEventHandler {
320- while true {
321- var siginfo = siginfo_t ( )
322- guard waitid ( P_ALL, id_t ( 0 ) , & siginfo, WEXITED) == 0 || errno == EINTR else {
323- return
324- }
325- var status : TerminationStatus ? = nil
326- switch siginfo. si_code {
327- case . init( CLD_EXITED) :
328- status = . exited( siginfo. si_status)
329- case . init( CLD_KILLED) , . init( CLD_DUMPED) :
330- status = . unhandledException( siginfo. si_status)
331- case . init( CLD_TRAPPED) , . init( CLD_STOPPED) , . init( CLD_CONTINUED) :
332- // Ignore these signals because they are not related to
333- // process exiting
334- break
335- default :
336- fatalError ( " Unexpected exit status: \( siginfo. si_code) " )
337- }
338- if let status = status {
339- _childProcessContinuations. withLock { continuations in
341+ // Create the thread. It will run immediately; because it runs in an infinite
342+ // loop, we aren't worried about detaching or joining it.
343+ var thread = pthread_t ( )
344+ _ = pthread_create (
345+ & thread,
346+ nil ,
347+ { _ -> UnsafeMutableRawPointer ? in
348+ // Run an infinite loop that waits for child processes to terminate and
349+ // captures their exit statuses.
350+ while true {
351+ // Listen for child process exit events. WNOWAIT means we don't perturb the
352+ // state of a terminated (zombie) child process, allowing us to fetch the
353+ // continuation (if available) before reaping.
354+ var siginfo = siginfo_t ( )
355+ errno = 0
356+ if waitid ( P_ALL, id_t ( 0 ) , & siginfo, WEXITED | WNOWAIT) == 0 {
340357 let pid = siginfo. si_pid
341- if let existing = continuations. removeValue ( forKey: pid) ,
342- case . continuation( let c) = existing
343- {
344- c. resume ( returning: status)
345- } else {
346- // We don't have continuation yet, just state status
347- continuations [ pid] = . status( status)
358+
359+ // If we had a continuation for this PID, allow the process to be reaped
360+ // and pass the resulting exit condition back to the calling task. If
361+ // there is no continuation, then either it hasn't been stored yet or
362+ // this child process is not tracked by the waiter thread.
363+ guard pid != 0 , let c = _childProcessContinuations. withLock ( { $0. removeValue ( forKey: pid) } ) else {
364+ continue
365+ }
366+
367+ c. resume ( with: Result {
368+ // Here waitid should not block because `pid` has already terminated at this point.
369+ while true {
370+ var siginfo = siginfo_t ( )
371+ errno = 0
372+ if waitid ( P_PID, numericCast ( pid) , & siginfo, WEXITED) == 0 {
373+ var status : TerminationStatus ? = nil
374+ switch siginfo. si_code {
375+ case . init( CLD_EXITED) :
376+ return . exited( siginfo. si_status)
377+ case . init( CLD_KILLED) , . init( CLD_DUMPED) :
378+ return . unhandledException( siginfo. si_status)
379+ default :
380+ fatalError ( " Unexpected exit status: \( siginfo. si_code) " )
381+ }
382+ } else if errno != EINTR {
383+ throw SubprocessError . UnderlyingError ( rawValue: errno)
384+ }
385+ }
386+ } )
387+ } else if errno == ECHILD {
388+ // We got ECHILD. If there are no continuations added right now, we should
389+ // suspend this thread on the no-children condition until it's awoken by a
390+ // newly-scheduled waiter process. (If this condition is spuriously
391+ // woken, we'll just loop again, which is fine.) Note that we read errno
392+ // outside the lock in case acquiring the lock perturbs it.
393+ _childProcessContinuations. withUnsafeUnderlyingLock { lock, childProcessContinuations in
394+ if childProcessContinuations. isEmpty {
395+ _ = pthread_cond_wait ( _waitThreadNoChildrenCondition, lock)
396+ }
348397 }
349398 }
350399 }
351- }
352- }
353- signalSource . resume ( )
400+ } ,
401+ nil
402+ )
354403} ( )
355404
356- /// Unchecked Sendable here since this class is only explicitly
357- /// initialized once during the lifetime of the process
358- final class SendableSourceSignal : @unchecked Sendable {
359- private let signalSource : DispatchSourceSignal
360-
361- func setEventHandler( handler: @escaping DispatchSourceHandler ) {
362- self . signalSource. setEventHandler ( handler: handler)
363- }
364-
365- func resume( ) {
366- self . signalSource. resume ( )
367- }
368-
369- init ( ) {
370- self . signalSource = DispatchSource . makeSignalSource (
371- signal: SIGCHLD,
372- queue: . global( )
373- )
374- }
375- }
376-
377405private func _setupMonitorSignalHandler( ) {
378406 // Only executed once
379407 setup
0 commit comments