11import { Attributes } from "@opentelemetry/api" ;
22import {
3+ MachinePresetName ,
34 TaskRunContext ,
45 TaskRunError ,
56 TaskRunErrorCodes ,
89 TaskRunExecutionRetry ,
910 TaskRunFailedExecutionResult ,
1011 TaskRunSuccessfulExecutionResult ,
11- exceptionEventEnhancer ,
1212 flattenAttributes ,
13- internalErrorFromUnexpectedExit ,
1413 isManualOutOfMemoryError ,
1514 sanitizeError ,
1615 shouldRetryError ,
@@ -32,8 +31,8 @@ import { CancelAttemptService } from "./cancelAttempt.server";
3231import { CreateCheckpointService } from "./createCheckpoint.server" ;
3332import { FinalizeTaskRunService } from "./finalizeTaskRun.server" ;
3433import { RetryAttemptService } from "./retryAttempt.server" ;
35- import { updateMetadataService } from "~/services/metadata/updateMetadata.server" ;
3634import { getTaskEventStoreTableForRun } from "../taskEventStore.server" ;
35+ import { socketIo } from "../handleSocketIo.server" ;
3736
3837type FoundAttempt = Awaited < ReturnType < typeof findAttempt > > ;
3938
@@ -256,9 +255,12 @@ export class CompleteAttemptService extends BaseService {
256255
257256 let retriableError = shouldRetryError ( taskRunErrorEnhancer ( completion . error ) ) ;
258257 let isOOMRetry = false ;
258+ let isOOMAttempt = isOOMError ( completion . error ) ;
259+ let isOnMaxOOMMachine = false ;
260+ let oomMachine : MachinePresetName | undefined ;
259261
260- //OOM errors should retry (if an OOM machine is specified)
261- if ( isOOMError ( completion . error ) ) {
262+ //OOM errors should retry (if an OOM machine is specified, and we're not already on it )
263+ if ( isOOMAttempt ) {
262264 const retryConfig = FailedTaskRunRetryHelper . getRetryConfig ( {
263265 run : {
264266 ...taskRunAttempt . taskRun ,
@@ -268,10 +270,10 @@ export class CompleteAttemptService extends BaseService {
268270 execution,
269271 } ) ;
270272
271- if (
272- retryConfig ?. outOfMemory ?. machine &&
273- retryConfig . outOfMemory . machine !== taskRunAttempt . taskRun . machinePreset
274- ) {
273+ oomMachine = retryConfig ?. outOfMemory ?. machine ;
274+ isOnMaxOOMMachine = oomMachine === taskRunAttempt . taskRun . machinePreset ;
275+
276+ if ( oomMachine && ! isOnMaxOOMMachine ) {
275277 //we will retry
276278 isOOMRetry = true ;
277279 retriableError = true ;
@@ -290,7 +292,7 @@ export class CompleteAttemptService extends BaseService {
290292 id : taskRunAttempt . taskRunId ,
291293 } ,
292294 data : {
293- machinePreset : retryConfig . outOfMemory . machine ,
295+ machinePreset : oomMachine ,
294296 } ,
295297 } ) ;
296298 }
@@ -309,11 +311,17 @@ export class CompleteAttemptService extends BaseService {
309311 environment,
310312 checkpoint,
311313 forceRequeue : isOOMRetry ,
314+ oomMachine,
312315 } ) ;
313316 }
314317
315318 // The attempt has failed and we won't retry
316319
320+ if ( isOOMAttempt && isOnMaxOOMMachine && environment . type !== "DEVELOPMENT" ) {
321+ // The attempt failed due to an OOM error but we're already on the machine we should retry on
322+ exitRun ( taskRunAttempt . taskRunId ) ;
323+ }
324+
317325 // Now we need to "complete" the task run event/span
318326 await eventRepository . completeEvent (
319327 getTaskEventStoreTableForRun ( taskRunAttempt . taskRun ) ,
@@ -507,6 +515,11 @@ export class CompleteAttemptService extends BaseService {
507515
508516 if ( forceRequeue ) {
509517 logger . debug ( "[CompleteAttemptService] Forcing retry via queue" , { runId : run . id } ) ;
518+
519+ // The run won't know it should shut down as we make the decision to force requeue here
520+ // This also ensures that this change is backwards compatible with older workers
521+ exitRun ( run . id ) ;
522+
510523 await retryViaQueue ( ) ;
511524 return ;
512525 }
@@ -544,6 +557,7 @@ export class CompleteAttemptService extends BaseService {
544557 environment,
545558 checkpoint,
546559 forceRequeue = false ,
560+ oomMachine,
547561 } : {
548562 execution : TaskRunExecution ;
549563 executionRetry : TaskRunExecutionRetry ;
@@ -552,29 +566,38 @@ export class CompleteAttemptService extends BaseService {
552566 environment : AuthenticatedEnvironment ;
553567 checkpoint ?: CheckpointData ;
554568 forceRequeue ?: boolean ;
569+ /** Setting this will also alter the retry span message */
570+ oomMachine ?: MachinePresetName ;
555571 } ) {
556572 const retryAt = new Date ( executionRetry . timestamp ) ;
557573
558574 // Retry the task run
559- await eventRepository . recordEvent ( `Retry #${ execution . attempt . number } delay` , {
560- taskSlug : taskRunAttempt . taskRun . taskIdentifier ,
561- environment,
562- attributes : {
563- metadata : this . #generateMetadataAttributesForNextAttempt( execution ) ,
564- properties : {
565- retryAt : retryAt . toISOString ( ) ,
566- } ,
567- runId : taskRunAttempt . taskRun . friendlyId ,
568- style : {
569- icon : "schedule-attempt" ,
575+ await eventRepository . recordEvent (
576+ `Retry #${ execution . attempt . number } delay${ oomMachine ? " after OOM" : "" } ` ,
577+ {
578+ taskSlug : taskRunAttempt . taskRun . taskIdentifier ,
579+ environment,
580+ attributes : {
581+ metadata : this . #generateMetadataAttributesForNextAttempt( execution ) ,
582+ properties : {
583+ retryAt : retryAt . toISOString ( ) ,
584+ previousMachine : oomMachine
585+ ? taskRunAttempt . taskRun . machinePreset ?? undefined
586+ : undefined ,
587+ nextMachine : oomMachine ,
588+ } ,
589+ runId : taskRunAttempt . taskRun . friendlyId ,
590+ style : {
591+ icon : "schedule-attempt" ,
592+ } ,
593+ queueId : taskRunAttempt . queueId ,
594+ queueName : taskRunAttempt . taskRun . queue ,
570595 } ,
571- queueId : taskRunAttempt . queueId ,
572- queueName : taskRunAttempt . taskRun . queue ,
573- } ,
574- context : taskRunAttempt . taskRun . traceContext as Record < string , string | undefined > ,
575- spanIdSeed : `retry-${ taskRunAttempt . number + 1 } ` ,
576- endTime : retryAt ,
577- } ) ;
596+ context : taskRunAttempt . taskRun . traceContext as Record < string , string | undefined > ,
597+ spanIdSeed : `retry-${ taskRunAttempt . number + 1 } ` ,
598+ endTime : retryAt ,
599+ }
600+ ) ;
578601
579602 logger . debug ( "[CompleteAttemptService] Retrying" , {
580603 taskRun : taskRunAttempt . taskRun . friendlyId ,
@@ -753,3 +776,10 @@ function isOOMError(error: TaskRunError) {
753776
754777 return false ;
755778}
779+
780+ function exitRun ( runId : string ) {
781+ socketIo . coordinatorNamespace . emit ( "REQUEST_RUN_CANCELLATION" , {
782+ version : "v1" ,
783+ runId,
784+ } ) ;
785+ }
0 commit comments