@@ -97,6 +97,12 @@ class ProdWorker {
9797 idempotencyKey : string ;
9898 }
9999 | undefined ;
100+ private readyForResumeReplay :
101+ | {
102+ idempotencyKey : string ;
103+ type : WaitReason ;
104+ }
105+ | undefined ;
100106
101107 #httpPort: number ;
102108 #httpServer: ReturnType < typeof createServer > ;
@@ -365,10 +371,18 @@ class ProdWorker {
365371 async #prepareForRetry( ) {
366372 // Clear state for retrying
367373 this . paused = false ;
374+ this . nextResumeAfter = undefined ;
368375 this . waitForPostStart = false ;
369376 this . executing = false ;
370377 this . attemptFriendlyId = undefined ;
371378 this . attemptNumber = undefined ;
379+
380+ // Clear replay state
381+ this . waitForTaskReplay = undefined ;
382+ this . waitForBatchReplay = undefined ;
383+ this . readyForLazyAttemptReplay = undefined ;
384+ this . durationResumeFallback = undefined ;
385+ this . readyForResumeReplay = undefined ;
372386 }
373387
374388 // MARK: CHECKPOINT PREP
@@ -405,13 +419,16 @@ class ProdWorker {
405419 this . waitForPostStart = false ;
406420
407421 this . durationResumeFallback = undefined ;
422+ this . readyForResumeReplay = undefined ;
408423
409424 this . _taskRunProcess ?. waitCompletedNotification ( ) ;
410425 }
411426
412427 async #readyForLazyAttempt( ) {
413428 const idempotencyKey = randomUUID ( ) ;
414429
430+ logger . log ( "ready for lazy attempt" , { idempotencyKey } ) ;
431+
415432 this . readyForLazyAttemptReplay = {
416433 idempotencyKey,
417434 } ;
@@ -420,7 +437,7 @@ class ProdWorker {
420437 // ..but we also have to be fast to avoid failing the task due to missing heartbeat
421438 for await ( const { delay, retry } of defaultBackoff . min ( 10 ) . maxRetries ( 7 ) ) {
422439 if ( retry > 0 ) {
423- logger . log ( "retrying ready for lazy attempt" , { retry } ) ;
440+ logger . log ( "retrying ready for lazy attempt" , { retry, idempotencyKey } ) ;
424441 }
425442
426443 this . #coordinatorSocket. socket . emit ( "READY_FOR_LAZY_ATTEMPT" , {
@@ -453,6 +470,93 @@ class ProdWorker {
453470 this . #failRun( this . runId , "Failed to receive execute request in a reasonable time" ) ;
454471 }
455472
473+ async #readyForResume( ) {
474+ const idempotencyKey = randomUUID ( ) ;
475+
476+ logger . log ( "readyForResume()" , {
477+ nextResumeAfter : this . nextResumeAfter ,
478+ attemptFriendlyId : this . attemptFriendlyId ,
479+ attemptNumber : this . attemptNumber ,
480+ idempotencyKey,
481+ } ) ;
482+
483+ if ( ! this . nextResumeAfter ) {
484+ logger . error ( "Missing next resume reason" , { status : this . #status } ) ;
485+
486+ this . #emitUnrecoverableError(
487+ "NoNextResume" ,
488+ "Next resume reason not set while resuming from paused state"
489+ ) ;
490+
491+ return ;
492+ }
493+
494+ if ( ! this . attemptFriendlyId ) {
495+ logger . error ( "Missing attempt friendly ID" , { status : this . #status } ) ;
496+
497+ this . #emitUnrecoverableError(
498+ "NoAttemptId" ,
499+ "Attempt ID not set while resuming from paused state"
500+ ) ;
501+
502+ return ;
503+ }
504+
505+ if ( ! this . attemptNumber ) {
506+ logger . error ( "Missing attempt number" , { status : this . #status } ) ;
507+
508+ this . #emitUnrecoverableError(
509+ "NoAttemptNumber" ,
510+ "Attempt number not set while resuming from paused state"
511+ ) ;
512+
513+ return ;
514+ }
515+
516+ this . readyForResumeReplay = {
517+ idempotencyKey,
518+ type : this . nextResumeAfter ,
519+ } ;
520+
521+ const lockedMetadata = {
522+ attemptFriendlyId : this . attemptFriendlyId ,
523+ attemptNumber : this . attemptNumber ,
524+ type : this . nextResumeAfter ,
525+ } ;
526+
527+ // Retry if we don't receive RESUME_AFTER_DEPENDENCY or RESUME_AFTER_DURATION in a reasonable time
528+ // ..but we also have to be fast to avoid failing the task due to missing heartbeat
529+ for await ( const { delay, retry } of defaultBackoff . min ( 10 ) . maxRetries ( 7 ) ) {
530+ if ( retry > 0 ) {
531+ logger . log ( "retrying ready for resume" , { retry, idempotencyKey } ) ;
532+ }
533+
534+ this . #coordinatorSocket. socket . emit ( "READY_FOR_RESUME" , {
535+ version : "v2" ,
536+ ...lockedMetadata ,
537+ } ) ;
538+
539+ await timeout ( delay . milliseconds ) ;
540+
541+ if ( ! this . readyForResumeReplay ) {
542+ logger . log ( "replay ready for resume cancelled, discarding" , {
543+ idempotencyKey,
544+ } ) ;
545+
546+ return ;
547+ }
548+
549+ if ( idempotencyKey !== this . readyForResumeReplay . idempotencyKey ) {
550+ logger . log ( "replay ready for resume idempotency key mismatch, discarding" , {
551+ idempotencyKey,
552+ newIdempotencyKey : this . readyForResumeReplay . idempotencyKey ,
553+ } ) ;
554+
555+ return ;
556+ }
557+ }
558+ }
559+
456560 #readyForCheckpoint( ) {
457561 this . #coordinatorSocket. socket . emit ( "READY_FOR_CHECKPOINT" , { version : "v1" } ) ;
458562 }
@@ -630,6 +734,7 @@ class ProdWorker {
630734 this . paused = false ;
631735 this . nextResumeAfter = undefined ;
632736 this . waitForPostStart = false ;
737+ this . readyForResumeReplay = undefined ;
633738
634739 for ( let i = 0 ; i < completions . length ; i ++ ) {
635740 const completion = completions [ i ] ;
@@ -845,46 +950,7 @@ class ProdWorker {
845950 }
846951
847952 if ( this . paused ) {
848- if ( ! this . nextResumeAfter ) {
849- logger . error ( "Missing next resume reason" , { status : this . #status } ) ;
850-
851- this . #emitUnrecoverableError(
852- "NoNextResume" ,
853- "Next resume reason not set while resuming from paused state"
854- ) ;
855-
856- return ;
857- }
858-
859- if ( ! this . attemptFriendlyId ) {
860- logger . error ( "Missing attempt friendly ID" , { status : this . #status } ) ;
861-
862- this . #emitUnrecoverableError(
863- "NoAttemptId" ,
864- "Attempt ID not set while resuming from paused state"
865- ) ;
866-
867- return ;
868- }
869-
870- if ( ! this . attemptNumber ) {
871- logger . error ( "Missing attempt number" , { status : this . #status } ) ;
872-
873- this . #emitUnrecoverableError(
874- "NoAttemptNumber" ,
875- "Attempt number not set while resuming from paused state"
876- ) ;
877-
878- return ;
879- }
880-
881- socket . emit ( "READY_FOR_RESUME" , {
882- version : "v2" ,
883- attemptFriendlyId : this . attemptFriendlyId ,
884- attemptNumber : this . attemptNumber ,
885- type : this . nextResumeAfter ,
886- } ) ;
887-
953+ await this . #readyForResume( ) ;
888954 return ;
889955 }
890956
@@ -1293,6 +1359,9 @@ class ProdWorker {
12931359 attemptNumber : this . attemptNumber ,
12941360 waitForTaskReplay : this . waitForTaskReplay ,
12951361 waitForBatchReplay : this . waitForBatchReplay ,
1362+ readyForLazyAttemptReplay : this . readyForLazyAttemptReplay ,
1363+ durationResumeFallback : this . durationResumeFallback ,
1364+ readyForResumeReplay : this . readyForResumeReplay ,
12961365 } ;
12971366 }
12981367
0 commit comments