@@ -74,30 +74,29 @@ public struct Configuration: Sendable {
7474 let pid = spawnResults. execution. processIdentifier
7575
7676 var spawnResultBox : SpawnResult ? ? = consume spawnResults
77+ var _spawnResult = spawnResultBox!. take ( ) !
7778
78- return try await withAsyncTaskCleanupHandler {
79- var _spawnResult = spawnResultBox!. take ( ) !
79+ let processIdentifier = _spawnResult. execution. processIdentifier
80+
81+ let result = try await withAsyncTaskCleanupHandler {
8082 let inputIO = _spawnResult. inputWriteEnd ( )
8183 let outputIO = _spawnResult. outputReadEnd ( )
8284 let errorIO = _spawnResult. errorReadEnd ( )
83- let processIdentifier = _spawnResult. execution. processIdentifier
8485
85- async let terminationStatus = try monitorProcessTermination (
86- forProcessWithIdentifier: processIdentifier
87- )
8886 // Body runs in the same isolation
89- let result = try await body ( _spawnResult. execution, inputIO, outputIO, errorIO)
90- return ExecutionResult (
91- terminationStatus: try await terminationStatus,
92- value: result
93- )
87+ return try await body ( _spawnResult. execution, inputIO, outputIO, errorIO)
9488 } onCleanup: {
9589 // Attempt to terminate the child process
9690 await Execution . runTeardownSequence (
9791 self . platformOptions. teardownSequence,
9892 on: pid
9993 )
10094 }
95+
96+ return ExecutionResult (
97+ terminationStatus: try await monitorProcessTermination ( forProcessWithIdentifier: processIdentifier) ,
98+ value: result
99+ )
101100 }
102101}
103102
@@ -752,11 +751,13 @@ extension Optional where Wrapped == String {
752751 }
753752}
754753
754+ /// Runs `body`, and then runs `onCleanup` if body throws an error, or if the parent task is cancelled. In the latter case, `onCleanup` may be run concurrently with `body`. `body` is guaranteed to run exactly once. `onCleanup` is guaranteed to run only once, or not at all.
755755internal func withAsyncTaskCleanupHandler< Result> (
756756 _ body: ( ) async throws -> Result ,
757757 onCleanup handler: @Sendable @escaping ( ) async -> Void ,
758758 isolation: isolated ( any Actor ) ? = #isolation
759759) async rethrows -> Result {
760+ let ( runCancellationHandlerStream, runCancellationHandlerContinuation) = AsyncThrowingStream . makeStream ( of: Void . self)
760761 return try await withThrowingTaskGroup (
761762 of: Void . self,
762763 returning: Result . self
@@ -767,15 +768,38 @@ internal func withAsyncTaskCleanupHandler<Result>(
767768 // before the time ends. We then run the cancel handler.
768769 do { while true { try await Task . sleep ( nanoseconds: 1_000_000_000 ) } } catch { }
769770 // Run task cancel handler
770- await handler ( )
771+ runCancellationHandlerContinuation. finish ( throwing: CancellationError ( ) )
772+ }
773+
774+ group. addTask {
775+ // Enumerate the async stream until it completes or throws an error.
776+ // Since we signal completion of the stream from cancellation or the
777+ // parent task or the body throwing, this ensures that we run the
778+ // cleanup handler exactly once in any failure scenario, and also do
779+ // so _immediately_ if the failure scenario is due to parent task
780+ // cancellation. We do so in a detached Task to prevent cancellation
781+ // of the parent task from interrupting enumeration of the stream itself.
782+ await Task . detached {
783+ do {
784+ var iterator = runCancellationHandlerStream. makeAsyncIterator ( )
785+ while let _ = try await iterator. next ( ) {
786+ }
787+ } catch {
788+ await handler ( )
789+ }
790+ } . value
791+ }
792+
793+ defer {
794+ group. cancelAll ( )
771795 }
772796
773797 do {
774798 let result = try await body ( )
775- group . cancelAll ( )
799+ runCancellationHandlerContinuation . finish ( )
776800 return result
777801 } catch {
778- await handler ( )
802+ runCancellationHandlerContinuation . finish ( throwing : error )
779803 throw error
780804 }
781805 }
0 commit comments