@@ -16,7 +16,7 @@ vi.mock("~/services/platform.v3.server", async (importOriginal) => {
1616
1717import { RunEngine } from "@internal/run-engine" ;
1818import { setupAuthenticatedEnvironment , setupBackgroundWorker } from "@internal/run-engine/tests" ;
19- import { containerTest } from "@internal/testcontainers" ;
19+ import { assertNonNullable , containerTest } from "@internal/testcontainers" ;
2020import { trace } from "@opentelemetry/api" ;
2121import { IOPacket } from "@trigger.dev/core/v3" ;
2222import { TaskRun } from "@trigger.dev/database" ;
@@ -31,11 +31,15 @@ import {
3131 TagValidationParams ,
3232 TracedEventSpan ,
3333 TraceEventConcern ,
34+ TriggerRacepoints ,
35+ TriggerRacepointSystem ,
3436 TriggerTaskRequest ,
3537 TriggerTaskValidator ,
3638 ValidationResult ,
3739} from "~/runEngine/types" ;
3840import { RunEngineTriggerTaskService } from "../../app/runEngine/services/triggerTask.server" ;
41+ import { promiseWithResolvers } from "@trigger.dev/core" ;
42+ import { setTimeout } from "node:timers/promises" ;
3943
4044vi . setConfig ( { testTimeout : 30_000 } ) ; // 30 seconds timeout
4145
@@ -108,6 +112,29 @@ class MockTraceEventConcern implements TraceEventConcern {
108112 }
109113}
110114
115+ type TriggerRacepoint = { promise : Promise < void > ; resolve : ( value : void ) => void } ;
116+
117+ class MockTriggerRacepointSystem implements TriggerRacepointSystem {
118+ private racepoints : Record < string , TriggerRacepoint | undefined > = { } ;
119+
120+ async waitForRacepoint ( { id } : { racepoint : TriggerRacepoints ; id : string } ) : Promise < void > {
121+ const racepoint = this . racepoints [ id ] ;
122+
123+ if ( racepoint ) {
124+ return racepoint . promise ;
125+ }
126+
127+ return Promise . resolve ( ) ;
128+ }
129+
130+ registerRacepoint ( racepoint : TriggerRacepoints , id : string ) : TriggerRacepoint {
131+ const { promise, resolve } = promiseWithResolvers < void > ( ) ;
132+ this . racepoints [ id ] = { promise, resolve } ;
133+
134+ return { promise, resolve } ;
135+ }
136+ }
137+
111138describe ( "RunEngineTriggerTaskService" , ( ) => {
112139 containerTest ( "should trigger a task with minimal options" , async ( { prisma, redisOptions } ) => {
113140 const engine = new RunEngine ( {
@@ -312,6 +339,228 @@ describe("RunEngineTriggerTaskService", () => {
312339 await engine . quit ( ) ;
313340 } ) ;
314341
342+ containerTest (
343+ "should handle idempotency keys when the engine throws an RunDuplicateIdempotencyKeyError" ,
344+ async ( { prisma, redisOptions } ) => {
345+ const engine = new RunEngine ( {
346+ prisma,
347+ worker : {
348+ redis : redisOptions ,
349+ workers : 1 ,
350+ tasksPerWorker : 10 ,
351+ pollIntervalMs : 100 ,
352+ } ,
353+ queue : {
354+ redis : redisOptions ,
355+ } ,
356+ runLock : {
357+ redis : redisOptions ,
358+ } ,
359+ machines : {
360+ defaultMachine : "small-1x" ,
361+ machines : {
362+ "small-1x" : {
363+ name : "small-1x" as const ,
364+ cpu : 0.5 ,
365+ memory : 0.5 ,
366+ centsPerMs : 0.0001 ,
367+ } ,
368+ } ,
369+ baseCostInCents : 0.0005 ,
370+ } ,
371+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
372+ logLevel : "debug" ,
373+ } ) ;
374+
375+ const parentTask = "parent-task" ;
376+
377+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
378+
379+ const taskIdentifier = "test-task" ;
380+
381+ //create background worker
382+ await setupBackgroundWorker ( engine , authenticatedEnvironment , [ parentTask , taskIdentifier ] ) ;
383+
384+ const parentRun1 = await engine . trigger (
385+ {
386+ number : 1 ,
387+ friendlyId : "run_p1" ,
388+ environment : authenticatedEnvironment ,
389+ taskIdentifier : parentTask ,
390+ payload : "{}" ,
391+ payloadType : "application/json" ,
392+ context : { } ,
393+ traceContext : { } ,
394+ traceId : "t12345" ,
395+ spanId : "s12345" ,
396+ queue : `task/${ parentTask } ` ,
397+ isTest : false ,
398+ tags : [ ] ,
399+ workerQueue : "main" ,
400+ } ,
401+ prisma
402+ ) ;
403+
404+ //dequeue parent and create the attempt
405+ await setTimeout ( 500 ) ;
406+ const dequeued = await engine . dequeueFromWorkerQueue ( {
407+ consumerId : "test_12345" ,
408+ workerQueue : "main" ,
409+ } ) ;
410+ await engine . startRunAttempt ( {
411+ runId : parentRun1 . id ,
412+ snapshotId : dequeued [ 0 ] . snapshot . id ,
413+ } ) ;
414+
415+ const parentRun2 = await engine . trigger (
416+ {
417+ number : 2 ,
418+ friendlyId : "run_p2" ,
419+ environment : authenticatedEnvironment ,
420+ taskIdentifier : parentTask ,
421+ payload : "{}" ,
422+ payloadType : "application/json" ,
423+ context : { } ,
424+ traceContext : { } ,
425+ traceId : "t12346" ,
426+ spanId : "s12346" ,
427+ queue : `task/${ parentTask } ` ,
428+ isTest : false ,
429+ tags : [ ] ,
430+ workerQueue : "main" ,
431+ } ,
432+ prisma
433+ ) ;
434+
435+ await setTimeout ( 500 ) ;
436+ const dequeued2 = await engine . dequeueFromWorkerQueue ( {
437+ consumerId : "test_12345" ,
438+ workerQueue : "main" ,
439+ } ) ;
440+ await engine . startRunAttempt ( {
441+ runId : parentRun2 . id ,
442+ snapshotId : dequeued2 [ 0 ] . snapshot . id ,
443+ } ) ;
444+
445+ const queuesManager = new DefaultQueueManager ( prisma , engine ) ;
446+
447+ const idempotencyKeyConcern = new IdempotencyKeyConcern (
448+ prisma ,
449+ engine ,
450+ new MockTraceEventConcern ( )
451+ ) ;
452+
453+ const triggerRacepointSystem = new MockTriggerRacepointSystem ( ) ;
454+
455+ const triggerTaskService = new RunEngineTriggerTaskService ( {
456+ engine,
457+ prisma,
458+ runNumberIncrementer : new MockRunNumberIncrementer ( ) ,
459+ payloadProcessor : new MockPayloadProcessor ( ) ,
460+ queueConcern : queuesManager ,
461+ idempotencyKeyConcern,
462+ validator : new MockTriggerTaskValidator ( ) ,
463+ traceEventConcern : new MockTraceEventConcern ( ) ,
464+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
465+ metadataMaximumSize : 1024 * 1024 * 1 , // 1MB
466+ triggerRacepointSystem,
467+ } ) ;
468+
469+ const idempotencyKey = "test-idempotency-key" ;
470+
471+ const racepoint = triggerRacepointSystem . registerRacepoint ( "idempotencyKey" , idempotencyKey ) ;
472+
473+ const childTriggerPromise1 = triggerTaskService . call ( {
474+ taskId : taskIdentifier ,
475+ environment : authenticatedEnvironment ,
476+ body : {
477+ payload : { test : "test" } ,
478+ options : {
479+ idempotencyKey,
480+ parentRunId : parentRun1 . friendlyId ,
481+ resumeParentOnCompletion : true ,
482+ } ,
483+ } ,
484+ } ) ;
485+
486+ const childTriggerPromise2 = triggerTaskService . call ( {
487+ taskId : taskIdentifier ,
488+ environment : authenticatedEnvironment ,
489+ body : {
490+ payload : { test : "test" } ,
491+ options : {
492+ idempotencyKey,
493+ parentRunId : parentRun2 . friendlyId ,
494+ resumeParentOnCompletion : true ,
495+ } ,
496+ } ,
497+ } ) ;
498+
499+ await setTimeout ( 500 ) ;
500+
501+ // Now we can resolve the racepoint
502+ racepoint . resolve ( ) ;
503+
504+ const result = await childTriggerPromise1 ;
505+ const result2 = await childTriggerPromise2 ;
506+
507+ expect ( result ) . toBeDefined ( ) ;
508+ expect ( result ?. run . friendlyId ) . toBeDefined ( ) ;
509+ expect ( result ?. run . status ) . toBe ( "PENDING" ) ;
510+
511+ const run = await prisma . taskRun . findUnique ( {
512+ where : {
513+ id : result ?. run . id ,
514+ } ,
515+ } ) ;
516+
517+ expect ( run ) . toBeDefined ( ) ;
518+ expect ( run ?. friendlyId ) . toBe ( result ?. run . friendlyId ) ;
519+ expect ( run ?. engine ) . toBe ( "V2" ) ;
520+ expect ( run ?. queuedAt ) . toBeDefined ( ) ;
521+ expect ( run ?. queue ) . toBe ( `task/${ taskIdentifier } ` ) ;
522+
523+ expect ( result2 ) . toBeDefined ( ) ;
524+ expect ( result2 ?. run . friendlyId ) . toBe ( result ?. run . friendlyId ) ;
525+
526+ const parent1ExecutionData = await engine . getRunExecutionData ( { runId : parentRun1 . id } ) ;
527+ assertNonNullable ( parent1ExecutionData ) ;
528+ expect ( parent1ExecutionData . snapshot . executionStatus ) . toBe ( "EXECUTING_WITH_WAITPOINTS" ) ;
529+
530+ const parent2ExecutionData = await engine . getRunExecutionData ( { runId : parentRun2 . id } ) ;
531+ assertNonNullable ( parent2ExecutionData ) ;
532+ expect ( parent2ExecutionData . snapshot . executionStatus ) . toBe ( "EXECUTING_WITH_WAITPOINTS" ) ;
533+
534+ const parent1RunWaitpoint = await prisma . taskRunWaitpoint . findFirst ( {
535+ where : {
536+ taskRunId : parentRun1 . id ,
537+ } ,
538+ include : {
539+ waitpoint : true ,
540+ } ,
541+ } ) ;
542+
543+ assertNonNullable ( parent1RunWaitpoint ) ;
544+ expect ( parent1RunWaitpoint . waitpoint . type ) . toBe ( "RUN" ) ;
545+ expect ( parent1RunWaitpoint . waitpoint . completedByTaskRunId ) . toBe ( result ?. run . id ) ;
546+
547+ const parent2RunWaitpoint = await prisma . taskRunWaitpoint . findFirst ( {
548+ where : {
549+ taskRunId : parentRun2 . id ,
550+ } ,
551+ include : {
552+ waitpoint : true ,
553+ } ,
554+ } ) ;
555+
556+ assertNonNullable ( parent2RunWaitpoint ) ;
557+ expect ( parent2RunWaitpoint . waitpoint . type ) . toBe ( "RUN" ) ;
558+ expect ( parent2RunWaitpoint . waitpoint . completedByTaskRunId ) . toBe ( result2 ?. run . id ) ;
559+
560+ await engine . quit ( ) ;
561+ }
562+ ) ;
563+
315564 containerTest (
316565 "should resolve queue names correctly when locked to version" ,
317566 async ( { prisma, redisOptions } ) => {
0 commit comments