@@ -3,6 +3,25 @@ import { $transaction, PrismaClientOrTransaction, PrismaErrorSchema, prisma } fr
33import type { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
44import { workerQueue } from "~/services/worker.server" ;
55import { logger } from "../logger.server" ;
6+ import { EventRecord , ExternalAccount } from "@trigger.dev/database" ;
7+
8+ type UpdateEventInput = {
9+ tx : PrismaClientOrTransaction ;
10+ existingEventLog : EventRecord ;
11+ reqEvent : RawEvent ;
12+ deliverAt ?: Date ;
13+ } ;
14+
15+ type CreateEventInput = {
16+ tx : PrismaClientOrTransaction ;
17+ event : RawEvent ;
18+ environment : AuthenticatedEnvironment ;
19+ deliverAt ?: Date ;
20+ sourceContext ?: { id : string ; metadata ?: any } ;
21+ externalAccount ?: ExternalAccount ;
22+ } ;
23+
24+ const EVENT_UPDATE_THRESHOLD_WINDOW_IN_MSECS = 5 * 1000 ; // 5 seconds
625
726export class IngestSendEvent {
827 #prismaClient: PrismaClientOrTransaction ;
@@ -52,34 +71,25 @@ export class IngestSendEvent {
5271 } )
5372 : undefined ;
5473
55- // Create a new event in the database
56- const eventLog = await tx . eventRecord . create ( {
57- data : {
58- organizationId : environment . organizationId ,
59- projectId : environment . projectId ,
60- environmentId : environment . id ,
61- eventId : event . id ,
62- name : event . name ,
63- timestamp : event . timestamp ?? new Date ( ) ,
64- payload : event . payload ?? { } ,
65- context : event . context ?? { } ,
66- source : event . source ?? "trigger.dev" ,
67- sourceContext,
68- deliverAt : deliverAt ,
69- externalAccountId : externalAccount ? externalAccount . id : undefined ,
74+ const existingEventLog = await tx . eventRecord . findUnique ( {
75+ where : {
76+ eventId_environmentId : {
77+ eventId : event . id ,
78+ environmentId : environment . id ,
79+ } ,
7080 } ,
7181 } ) ;
7282
73- if ( this . deliverEvents ) {
74- // Produce a message to the event bus
75- await workerQueue . enqueue (
76- "deliverEvent" ,
77- {
78- id : eventLog . id ,
79- } ,
80- { runAt : eventLog . deliverAt , tx , jobKey : `event: ${ eventLog . id } ` }
81- ) ;
82- }
83+ const eventLog = await ( existingEventLog
84+ ? this . updateEvent ( { tx , existingEventLog , reqEvent : event , deliverAt } )
85+ : this . createEvent ( {
86+ tx ,
87+ event ,
88+ environment ,
89+ deliverAt ,
90+ sourceContext ,
91+ externalAccount ,
92+ } ) ) ;
8393
8494 return eventLog ;
8595 } ) ;
@@ -95,21 +105,81 @@ export class IngestSendEvent {
95105 throw error ;
96106 }
97107
98- // If the error is a Prisma unique constraint error, it means that the event already exists
99- if ( prismaError . success && prismaError . data . code === "P2002" ) {
100- logger . debug ( "Event already exists, finding and returning" , { event , environment } ) ;
108+ throw error ;
109+ }
110+ }
101111
102- return this . #prismaClient. eventRecord . findUniqueOrThrow ( {
103- where : {
104- eventId_environmentId : {
105- eventId : event . id ,
106- environmentId : environment . id ,
107- } ,
108- } ,
109- } ) ;
110- }
112+ private async createEvent ( {
113+ tx,
114+ event,
115+ environment,
116+ deliverAt,
117+ sourceContext,
118+ externalAccount,
119+ } : CreateEventInput ) {
120+ const eventLog = await tx . eventRecord . create ( {
121+ data : {
122+ organizationId : environment . organizationId ,
123+ projectId : environment . projectId ,
124+ environmentId : environment . id ,
125+ eventId : event . id ,
126+ name : event . name ,
127+ timestamp : event . timestamp ?? new Date ( ) ,
128+ payload : event . payload ?? { } ,
129+ context : event . context ?? { } ,
130+ source : event . source ?? "trigger.dev" ,
131+ sourceContext,
132+ deliverAt : deliverAt ,
133+ externalAccountId : externalAccount ? externalAccount . id : undefined ,
134+ } ,
135+ } ) ;
111136
112- throw error ;
137+ await this . enqueueWorkerEvent ( tx , eventLog ) ;
138+
139+ return eventLog ;
140+ }
141+
142+ private async updateEvent ( { tx, existingEventLog, reqEvent, deliverAt } : UpdateEventInput ) {
143+ if ( ! this . shouldUpdateEvent ( existingEventLog ) ) {
144+ logger . debug ( `not updating event for event id: ${ existingEventLog . eventId } ` ) ;
145+ return existingEventLog ;
146+ }
147+
148+ const updatedEventLog = await tx . eventRecord . update ( {
149+ where : {
150+ eventId_environmentId : {
151+ eventId : existingEventLog . eventId ,
152+ environmentId : existingEventLog . environmentId ,
153+ } ,
154+ } ,
155+ data : {
156+ payload : reqEvent . payload ?? existingEventLog . payload ,
157+ context : reqEvent . context ?? existingEventLog . context ,
158+ deliverAt : deliverAt ?? new Date ( ) ,
159+ } ,
160+ } ) ;
161+
162+ await this . enqueueWorkerEvent ( tx , updatedEventLog ) ;
163+
164+ return updatedEventLog ;
165+ }
166+
167+ private shouldUpdateEvent ( eventLog : EventRecord ) {
168+ const thresholdTime = new Date ( Date . now ( ) + EVENT_UPDATE_THRESHOLD_WINDOW_IN_MSECS ) ;
169+
170+ return eventLog . deliverAt >= thresholdTime ;
171+ }
172+
173+ private async enqueueWorkerEvent ( tx : PrismaClientOrTransaction , eventLog : EventRecord ) {
174+ if ( this . deliverEvents ) {
175+ // Produce a message to the event bus
176+ await workerQueue . enqueue (
177+ "deliverEvent" ,
178+ {
179+ id : eventLog . id ,
180+ } ,
181+ { runAt : eventLog . deliverAt , tx, jobKey : `event:${ eventLog . id } ` }
182+ ) ;
113183 }
114184 }
115185}
0 commit comments