@@ -2,6 +2,7 @@ import type { RawEvent, SendEventOptions } from "@trigger.dev/core";
22import { $transaction , PrismaClientOrTransaction , PrismaErrorSchema , prisma } from "~/db.server" ;
33import type { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
44import { workerQueue } from "~/services/worker.server" ;
5+ import { logger } from "../logger.server" ;
56
67export class IngestSendEvent {
78 #prismaClient: PrismaClientOrTransaction ;
@@ -33,71 +34,87 @@ export class IngestSendEvent {
3334 try {
3435 const deliverAt = this . #calculateDeliverAt( options ) ;
3536
36- return await $transaction ( this . #prismaClient, async ( tx ) => {
37- const externalAccount = options ?. accountId
38- ? await tx . externalAccount . findUniqueOrThrow ( {
39- where : {
40- environmentId_identifier : {
41- environmentId : environment . id ,
42- identifier : options . accountId ,
37+ return await $transaction (
38+ this . #prismaClient,
39+ async ( tx ) => {
40+ const externalAccount = options ?. accountId
41+ ? await tx . externalAccount . findUniqueOrThrow ( {
42+ where : {
43+ environmentId_identifier : {
44+ environmentId : environment . id ,
45+ identifier : options . accountId ,
46+ } ,
4347 } ,
44- } ,
45- } )
46- : undefined ;
48+ } )
49+ : undefined ;
4750
48- // Create a new event in the database
49- const eventLog = await tx . eventRecord . create ( {
50- data : {
51- organization : {
52- connect : {
53- id : environment . organizationId ,
51+ // Create a new event in the database
52+ const eventLog = await tx . eventRecord . create ( {
53+ data : {
54+ organization : {
55+ connect : {
56+ id : environment . organizationId ,
57+ } ,
5458 } ,
55- } ,
56- project : {
57- connect : {
58- id : environment . projectId ,
59+ project : {
60+ connect : {
61+ id : environment . projectId ,
62+ } ,
5963 } ,
60- } ,
61- environment : {
62- connect : {
63- id : environment . id ,
64+ environment : {
65+ connect : {
66+ id : environment . id ,
67+ } ,
6468 } ,
69+ eventId : event . id ,
70+ name : event . name ,
71+ timestamp : event . timestamp ?? new Date ( ) ,
72+ payload : event . payload ?? { } ,
73+ context : event . context ?? { } ,
74+ source : event . source ?? "trigger.dev" ,
75+ sourceContext,
76+ deliverAt : deliverAt ,
77+ externalAccount : externalAccount
78+ ? {
79+ connect : {
80+ id : externalAccount . id ,
81+ } ,
82+ }
83+ : { } ,
6584 } ,
66- eventId : event . id ,
67- name : event . name ,
68- timestamp : event . timestamp ?? new Date ( ) ,
69- payload : event . payload ?? { } ,
70- context : event . context ?? { } ,
71- source : event . source ?? "trigger.dev" ,
72- sourceContext,
73- deliverAt : deliverAt ,
74- externalAccount : externalAccount
75- ? {
76- connect : {
77- id : externalAccount . id ,
78- } ,
79- }
80- : { } ,
81- } ,
82- } ) ;
85+ } ) ;
8386
84- if ( this . deliverEvents ) {
85- // Produce a message to the event bus
86- await workerQueue . enqueue (
87- "deliverEvent" ,
88- {
89- id : eventLog . id ,
90- } ,
91- { runAt : eventLog . deliverAt , tx, jobKey : `event:${ eventLog . id } ` }
92- ) ;
93- }
87+ if ( this . deliverEvents ) {
88+ // Produce a message to the event bus
89+ await workerQueue . enqueue (
90+ "deliverEvent" ,
91+ {
92+ id : eventLog . id ,
93+ } ,
94+ { runAt : eventLog . deliverAt , tx, jobKey : `event:${ eventLog . id } ` }
95+ ) ;
96+ }
9497
95- return eventLog ;
96- } ) ;
98+ return eventLog ;
99+ } ,
100+ { rethrowPrismaErrors : true }
101+ ) ;
97102 } catch ( error ) {
98103 const prismaError = PrismaErrorSchema . safeParse ( error ) ;
104+
105+ if ( ! prismaError . success ) {
106+ logger . debug ( "Error parsing prisma error" , {
107+ error,
108+ parseError : prismaError . error . format ( ) ,
109+ } ) ;
110+
111+ throw error ;
112+ }
113+
99114 // If the error is a Prisma unique constraint error, it means that the event already exists
100115 if ( prismaError . success && prismaError . data . code === "P2002" ) {
116+ logger . debug ( "Event already exists, finding and returning" , { event, environment } ) ;
117+
101118 return this . #prismaClient. eventRecord . findUniqueOrThrow ( {
102119 where : {
103120 eventId_environmentId : {
0 commit comments