@@ -51,6 +51,7 @@ export class RunExecution {
5151
5252 private _runFriendlyId ?: string ;
5353 private currentSnapshotId ?: string ;
54+ private currentAttemptNumber ?: number ;
5455 private currentTaskRunEnv ?: Record < string , string > ;
5556
5657 private dequeuedAt ?: Date ;
@@ -65,6 +66,7 @@ export class RunExecution {
6566 private snapshotPoller ?: RunExecutionSnapshotPoller ;
6667
6768 private lastHeartbeat ?: Date ;
69+ private isShuttingDown = false ;
6870
6971 constructor ( opts : RunExecutionOptions ) {
7072 this . id = randomBytes ( 4 ) . toString ( "hex" ) ;
@@ -86,10 +88,6 @@ export class RunExecution {
8688 throw new Error ( "prepareForExecution called after process was already created" ) ;
8789 }
8890
89- if ( this . isPreparedForNextRun ) {
90- throw new Error ( "prepareForExecution called after execution was already prepared" ) ;
91- }
92-
9391 this . taskRunProcess = this . createTaskRunProcess ( {
9492 envVars : opts . taskRunEnv ,
9593 isWarmStart : true ,
@@ -150,9 +148,14 @@ export class RunExecution {
150148 }
151149
152150 /**
153- * Returns true if the execution has been prepared with task run env .
151+ * Returns true if no run has been started yet and the process is prepared for the next run .
154152 */
155- get isPreparedForNextRun ( ) : boolean {
153+ get canExecute ( ) : boolean {
154+ // If we've ever had a run ID, this execution can't be reused
155+ if ( this . _runFriendlyId ) {
156+ return false ;
157+ }
158+
156159 return ! ! this . taskRunProcess ?. isPreparedForNextRun ;
157160 }
158161
@@ -161,6 +164,11 @@ export class RunExecution {
161164 * or when the snapshot poller detects a change
162165 */
163166 public async handleSnapshotChange ( runData : RunExecutionData ) : Promise < void > {
167+ if ( this . isShuttingDown ) {
168+ this . sendDebugLog ( "handleSnapshotChange: shutting down, skipping" ) ;
169+ return ;
170+ }
171+
164172 const { run, snapshot, completedWaitpoints } = runData ;
165173
166174 const snapshotMetadata = {
@@ -191,8 +199,6 @@ export class RunExecution {
191199 return ;
192200 }
193201
194- this . sendDebugLog ( `enqueued snapshot change: ${ snapshot . executionStatus } ` , snapshotMetadata ) ;
195-
196202 this . snapshotChangeQueue . push ( runData ) ;
197203 await this . processSnapshotChangeQueue ( ) ;
198204 }
@@ -240,11 +246,16 @@ export class RunExecution {
240246 }
241247
242248 if ( snapshot . friendlyId === this . currentSnapshotId ) {
243- this . sendDebugLog ( "handleSnapshotChange: snapshot not changed" , snapshotMetadata ) ;
244249 return ;
245250 }
246251
247- this . sendDebugLog ( `snapshot change: ${ snapshot . executionStatus } ` , snapshotMetadata ) ;
252+ if ( this . currentAttemptNumber && this . currentAttemptNumber !== run . attemptNumber ) {
253+ this . sendDebugLog ( "ERROR: attempt number mismatch" , snapshotMetadata ) ;
254+ await this . taskRunProcess ?. suspend ( ) ;
255+ return ;
256+ }
257+
258+ this . sendDebugLog ( `snapshot has changed to: ${ snapshot . executionStatus } ` , snapshotMetadata ) ;
248259
249260 // Reset the snapshot poll interval so we don't do unnecessary work
250261 this . snapshotPoller ?. resetCurrentInterval ( ) ;
@@ -456,6 +467,16 @@ export class RunExecution {
456467 // A snapshot was just created, so update the snapshot ID
457468 this . currentSnapshotId = start . data . snapshot . friendlyId ;
458469
470+ // Also set or update the attempt number - we do this to detect illegal attempt number changes, e.g. from stalled runners coming back online
471+ const attemptNumber = start . data . run . attemptNumber ;
472+ if ( attemptNumber && attemptNumber > 0 ) {
473+ this . currentAttemptNumber = attemptNumber ;
474+ } else {
475+ this . sendDebugLog ( "ERROR: invalid attempt number returned from start attempt" , {
476+ attemptNumber : String ( attemptNumber ) ,
477+ } ) ;
478+ }
479+
459480 const metrics = this . measureExecutionMetrics ( {
460481 attemptCreatedAt : attemptStartedAt ,
461482 dequeuedAt : this . dequeuedAt ?. getTime ( ) ,
@@ -597,8 +618,18 @@ export class RunExecution {
597618 metrics : TaskRunExecutionMetrics ;
598619 isWarmStart ?: boolean ;
599620 } ) {
621+ // For immediate retries, we need to ensure the task run process is prepared for the next attempt
622+ if (
623+ this . runFriendlyId &&
624+ this . taskRunProcess &&
625+ ! this . taskRunProcess . isPreparedForNextAttempt
626+ ) {
627+ this . sendDebugLog ( "killing existing task run process before executing next attempt" ) ;
628+ await this . kill ( ) . catch ( ( ) => { } ) ;
629+ }
630+
600631 // To skip this step and eagerly create the task run process, run prepareForExecution first
601- if ( ! this . taskRunProcess || ! this . isPreparedForNextRun ) {
632+ if ( ! this . taskRunProcess || ! this . taskRunProcess . isPreparedForNextRun ) {
602633 this . taskRunProcess = this . createTaskRunProcess ( { envVars, isWarmStart } ) ;
603634 }
604635
@@ -655,11 +686,15 @@ export class RunExecution {
655686 }
656687
657688 public exit ( ) {
658- if ( this . isPreparedForNextRun ) {
689+ if ( this . taskRunProcess ?. isPreparedForNextRun ) {
659690 this . taskRunProcess ?. forceExit ( ) ;
660691 }
661692 }
662693
694+ public async kill ( ) {
695+ await this . taskRunProcess ?. kill ( "SIGKILL" ) ;
696+ }
697+
663698 private async complete ( { completion } : { completion : TaskRunExecutionResult } ) : Promise < void > {
664699 if ( ! this . runFriendlyId || ! this . currentSnapshotId ) {
665700 throw new Error ( "Cannot complete run: missing run or snapshot ID" ) ;
@@ -897,7 +932,7 @@ export class RunExecution {
897932 this . lastHeartbeat = new Date ( ) ;
898933 }
899934
900- sendDebugLog (
935+ private sendDebugLog (
901936 message : string ,
902937 properties ?: SendDebugLogOptions [ "properties" ] ,
903938 runIdOverride ?: string
@@ -958,6 +993,11 @@ export class RunExecution {
958993 }
959994
960995 private stopServices ( ) {
996+ if ( this . isShuttingDown ) {
997+ return ;
998+ }
999+
1000+ this . isShuttingDown = true ;
9611001 this . snapshotPoller ?. stop ( ) ;
9621002 this . taskRunProcess ?. onTaskRunHeartbeat . detach ( ) ;
9631003 }
0 commit comments