@@ -3,6 +3,7 @@ import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
33import { logger } from "~/services/logger.server" ;
44import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server" ;
55import type { RunEngine } from "~/v3/runEngine.server" ;
6+ import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus" ;
67import type { TraceEventConcern , TriggerTaskRequest } from "../types" ;
78
89export type IdempotencyKeyConcernResult =
@@ -41,6 +42,7 @@ export class IdempotencyKeyConcern {
4142 : undefined ;
4243
4344 if ( existingRun ) {
45+ // The idempotency key has expired
4446 if ( existingRun . idempotencyKeyExpiresAt && existingRun . idempotencyKeyExpiresAt < new Date ( ) ) {
4547 logger . debug ( "[TriggerTaskService][call] Idempotency key has expired" , {
4648 idempotencyKey : request . options ?. idempotencyKey ,
@@ -52,42 +54,62 @@ export class IdempotencyKeyConcern {
5254 where : { id : existingRun . id , idempotencyKey } ,
5355 data : { idempotencyKey : null , idempotencyKeyExpiresAt : null } ,
5456 } ) ;
55- } else {
56- const associatedWaitpoint = existingRun . associatedWaitpoint ;
57- const parentRunId = request . body . options ?. parentRunId ;
58- const resumeParentOnCompletion = request . body . options ?. resumeParentOnCompletion ;
59- //We're using `andWait` so we need to block the parent run with a waitpoint
60- if ( associatedWaitpoint && resumeParentOnCompletion && parentRunId ) {
61- await this . traceEventConcern . traceIdempotentRun (
62- request ,
63- {
64- existingRun,
65- idempotencyKey,
66- incomplete : associatedWaitpoint . status === "PENDING" ,
67- isError : associatedWaitpoint . outputIsError ,
68- } ,
69- async ( event ) => {
70- //block run with waitpoint
71- await this . engine . blockRunWithWaitpoint ( {
72- runId : RunId . fromFriendlyId ( parentRunId ) ,
73- waitpoints : associatedWaitpoint . id ,
74- spanIdToComplete : event . spanId ,
75- batch : request . options ?. batchId
76- ? {
77- id : request . options . batchId ,
78- index : request . options . batchIndex ?? 0 ,
79- }
80- : undefined ,
81- projectId : request . environment . projectId ,
82- organizationId : request . environment . organizationId ,
83- tx : this . prisma ,
84- } ) ;
85- }
86- ) ;
87- }
8857
89- return { isCached : true , run : existingRun } ;
58+ return { isCached : false , idempotencyKey , idempotencyKeyExpiresAt } ;
9059 }
60+
61+ // If the existing run failed or was expired, we clear the key and do a new run
62+ if ( shouldIdempotencyKeyBeCleared ( existingRun . status ) ) {
63+ logger . debug ( "[TriggerTaskService][call] Idempotency key should be cleared" , {
64+ idempotencyKey : request . options ?. idempotencyKey ,
65+ runStatus : existingRun . status ,
66+ runId : existingRun . id ,
67+ } ) ;
68+
69+ // Update the existing run to remove the idempotency key
70+ await this . prisma . taskRun . updateMany ( {
71+ where : { id : existingRun . id , idempotencyKey } ,
72+ data : { idempotencyKey : null , idempotencyKeyExpiresAt : null } ,
73+ } ) ;
74+
75+ return { isCached : false , idempotencyKey, idempotencyKeyExpiresAt } ;
76+ }
77+
78+ // We have an idempotent run, so we return it
79+ const associatedWaitpoint = existingRun . associatedWaitpoint ;
80+ const parentRunId = request . body . options ?. parentRunId ;
81+ const resumeParentOnCompletion = request . body . options ?. resumeParentOnCompletion ;
82+ //We're using `andWait` so we need to block the parent run with a waitpoint
83+ if ( associatedWaitpoint && resumeParentOnCompletion && parentRunId ) {
84+ await this . traceEventConcern . traceIdempotentRun (
85+ request ,
86+ {
87+ existingRun,
88+ idempotencyKey,
89+ incomplete : associatedWaitpoint . status === "PENDING" ,
90+ isError : associatedWaitpoint . outputIsError ,
91+ } ,
92+ async ( event ) => {
93+ //block run with waitpoint
94+ await this . engine . blockRunWithWaitpoint ( {
95+ runId : RunId . fromFriendlyId ( parentRunId ) ,
96+ waitpoints : associatedWaitpoint . id ,
97+ spanIdToComplete : event . spanId ,
98+ batch : request . options ?. batchId
99+ ? {
100+ id : request . options . batchId ,
101+ index : request . options . batchIndex ?? 0 ,
102+ }
103+ : undefined ,
104+ projectId : request . environment . projectId ,
105+ organizationId : request . environment . organizationId ,
106+ tx : this . prisma ,
107+ } ) ;
108+ }
109+ ) ;
110+ }
111+
112+ return { isCached : true , run : existingRun } ;
91113 }
92114
93115 return { isCached : false , idempotencyKey, idempotencyKeyExpiresAt } ;
0 commit comments