@@ -7,114 +7,152 @@ import { MACHINE_METADATA } from "./constants.js";
77import { EventCache } from "./eventCache.js" ;
88import nodeMachineId from "node-machine-id" ;
99import { getDeviceId } from "@mongodb-js/device-id" ;
10+ import fs from "fs/promises" ;
11+
12+ async function fileExists ( filePath : string ) : Promise < boolean > {
13+ try {
14+ await fs . access ( filePath , fs . constants . F_OK ) ;
15+ return true ; // File exists
16+ } catch ( e : unknown ) {
17+ if (
18+ e instanceof Error &&
19+ (
20+ e as Error & {
21+ code : string ;
22+ }
23+ ) . code === "ENOENT"
24+ ) {
25+ return false ; // File does not exist
26+ }
27+ throw e ; // Re-throw unexpected errors
28+ }
29+ }
1030
11- type EventResult = {
12- success : boolean ;
13- error ?: Error ;
14- } ;
31+ async function isContainerized ( ) : Promise < boolean > {
32+ if ( process . env . container ) {
33+ return true ;
34+ }
35+
36+ const exists = await Promise . all ( [ "/.dockerenv" , "/run/.containerenv" , "/var/run/.containerenv" ] . map ( fileExists ) ) ;
1537
16- export const DEVICE_ID_TIMEOUT = 3000 ;
38+ return exists . includes ( true ) ;
39+ }
1740
1841export class Telemetry {
19- private isBufferingEvents : boolean = true ;
20- /** Resolves when the device ID is retrieved or timeout occurs */
21- public deviceIdPromise : Promise < string > | undefined ;
2242 private deviceIdAbortController = new AbortController ( ) ;
2343 private eventCache : EventCache ;
2444 private getRawMachineId : ( ) => Promise < string > ;
45+ private getContainerEnv : ( ) => Promise < boolean > ;
46+ private cachedCommonProperties ?: CommonProperties ;
47+ private flushing : boolean = false ;
2548
2649 private constructor (
2750 private readonly session : Session ,
2851 private readonly userConfig : UserConfig ,
29- private readonly commonProperties : CommonProperties ,
30- { eventCache, getRawMachineId } : { eventCache : EventCache ; getRawMachineId : ( ) => Promise < string > }
52+ {
53+ eventCache,
54+ getRawMachineId,
55+ getContainerEnv,
56+ } : {
57+ eventCache : EventCache ;
58+ getRawMachineId : ( ) => Promise < string > ;
59+ getContainerEnv : ( ) => Promise < boolean > ;
60+ }
3161 ) {
3262 this . eventCache = eventCache ;
3363 this . getRawMachineId = getRawMachineId ;
64+ this . getContainerEnv = getContainerEnv ;
3465 }
3566
3667 static create (
3768 session : Session ,
3869 userConfig : UserConfig ,
3970 {
40- commonProperties = { ...MACHINE_METADATA } ,
4171 eventCache = EventCache . getInstance ( ) ,
4272 getRawMachineId = ( ) => nodeMachineId . machineId ( true ) ,
73+ getContainerEnv = isContainerized ,
4374 } : {
4475 eventCache ?: EventCache ;
4576 getRawMachineId ?: ( ) => Promise < string > ;
46- commonProperties ?: CommonProperties ;
77+ getContainerEnv ?: ( ) => Promise < boolean > ;
4778 } = { }
4879 ) : Telemetry {
49- const instance = new Telemetry ( session , userConfig , commonProperties , { eventCache, getRawMachineId } ) ;
50-
51- void instance . start ( ) ;
52- return instance ;
53- }
54-
55- private async start ( ) : Promise < void > {
56- if ( ! this . isTelemetryEnabled ( ) ) {
57- return ;
58- }
59- this . deviceIdPromise = getDeviceId ( {
60- getMachineId : ( ) => this . getRawMachineId ( ) ,
61- onError : ( reason , error ) => {
62- switch ( reason ) {
63- case "resolutionError" :
64- logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , String ( error ) ) ;
65- break ;
66- case "timeout" :
67- logger . debug ( LogId . telemetryDeviceIdTimeout , "telemetry" , "Device ID retrieval timed out" ) ;
68- break ;
69- case "abort" :
70- // No need to log in the case of aborts
71- break ;
72- }
73- } ,
74- abortSignal : this . deviceIdAbortController . signal ,
80+ const instance = new Telemetry ( session , userConfig , {
81+ eventCache,
82+ getRawMachineId,
83+ getContainerEnv,
7584 } ) ;
7685
77- this . commonProperties . device_id = await this . deviceIdPromise ;
78-
79- this . isBufferingEvents = false ;
86+ return instance ;
8087 }
8188
8289 public async close ( ) : Promise < void > {
8390 this . deviceIdAbortController . abort ( ) ;
84- this . isBufferingEvents = false ;
85- await this . emitEvents ( this . eventCache . getEvents ( ) ) ;
91+ await this . flush ( ) ;
8692 }
8793
8894 /**
8995 * Emits events through the telemetry pipeline
9096 * @param events - The events to emit
9197 */
92- public async emitEvents ( events : BaseEvent [ ] ) : Promise < void > {
93- try {
94- if ( ! this . isTelemetryEnabled ( ) ) {
95- logger . info ( LogId . telemetryEmitFailure , "telemetry" , `Telemetry is disabled.` ) ;
96- return ;
97- }
98-
99- await this . emit ( events ) ;
100- } catch {
101- logger . debug ( LogId . telemetryEmitFailure , "telemetry" , `Error emitting telemetry events.` ) ;
102- }
98+ public emitEvents ( events : BaseEvent [ ] ) : void {
99+ void this . flush ( events ) ;
103100 }
104101
105102 /**
106103 * Gets the common properties for events
107104 * @returns Object containing common properties for all events
108105 */
109- public getCommonProperties ( ) : CommonProperties {
110- return {
111- ...this . commonProperties ,
112- mcp_client_version : this . session . agentRunner ?. version ,
113- mcp_client_name : this . session . agentRunner ?. name ,
114- session_id : this . session . sessionId ,
115- config_atlas_auth : this . session . apiClient . hasCredentials ( ) ? "true" : "false" ,
116- config_connection_string : this . userConfig . connectionString ? "true" : "false" ,
117- } ;
106+ private async getCommonProperties ( ) : Promise < CommonProperties > {
107+ if ( ! this . cachedCommonProperties ) {
108+ let deviceId : string | undefined ;
109+ let containerEnv : boolean | undefined ;
110+ try {
111+ await Promise . all ( [
112+ getDeviceId ( {
113+ getMachineId : ( ) => this . getRawMachineId ( ) ,
114+ onError : ( reason , error ) => {
115+ switch ( reason ) {
116+ case "resolutionError" :
117+ logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , String ( error ) ) ;
118+ break ;
119+ case "timeout" :
120+ logger . debug (
121+ LogId . telemetryDeviceIdTimeout ,
122+ "telemetry" ,
123+ "Device ID retrieval timed out"
124+ ) ;
125+ break ;
126+ case "abort" :
127+ // No need to log in the case of aborts
128+ break ;
129+ }
130+ } ,
131+ abortSignal : this . deviceIdAbortController . signal ,
132+ } ) . then ( ( id ) => {
133+ deviceId = id ;
134+ } ) ,
135+ this . getContainerEnv ( ) . then ( ( env ) => {
136+ containerEnv = env ;
137+ } ) ,
138+ ] ) ;
139+ } catch ( error : unknown ) {
140+ const err = error instanceof Error ? error : new Error ( String ( error ) ) ;
141+ logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , err . message ) ;
142+ }
143+ this . cachedCommonProperties = {
144+ ...MACHINE_METADATA ,
145+ mcp_client_version : this . session . agentRunner ?. version ,
146+ mcp_client_name : this . session . agentRunner ?. name ,
147+ session_id : this . session . sessionId ,
148+ config_atlas_auth : this . session . apiClient . hasCredentials ( ) ? "true" : "false" ,
149+ config_connection_string : this . userConfig . connectionString ? "true" : "false" ,
150+ is_container_env : containerEnv ? "true" : "false" ,
151+ device_id : deviceId ,
152+ } ;
153+ }
154+
155+ return this . cachedCommonProperties ;
118156 }
119157
120158 /**
@@ -135,60 +173,74 @@ export class Telemetry {
135173 }
136174
137175 /**
138- * Attempts to emit events through authenticated and unauthenticated clients
176+ * Attempts to flush events through authenticated and unauthenticated clients
139177 * Falls back to caching if both attempts fail
140178 */
141- private async emit ( events : BaseEvent [ ] ) : Promise < void > {
142- if ( this . isBufferingEvents ) {
143- this . eventCache . appendEvents ( events ) ;
179+ public async flush ( events ? : BaseEvent [ ] ) : Promise < void > {
180+ if ( ! this . isTelemetryEnabled ( ) ) {
181+ logger . info ( LogId . telemetryEmitFailure , "telemetry" , `Telemetry is disabled.` ) ;
144182 return ;
145183 }
146184
147- const cachedEvents = this . eventCache . getEvents ( ) ;
148- const allEvents = [ ...cachedEvents , ...events ] ;
185+ if ( this . flushing ) {
186+ this . eventCache . appendEvents ( events ?? [ ] ) ;
187+ process . nextTick ( async ( ) => {
188+ // try again if in the middle of a flush
189+ await this . flush ( ) ;
190+ } ) ;
191+ return ;
192+ }
149193
150- logger . debug (
151- LogId . telemetryEmitStart ,
152- "telemetry" ,
153- `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)`
154- ) ;
194+ this . flushing = true ;
155195
156- const result = await this . sendEvents ( this . session . apiClient , allEvents ) ;
157- if ( result . success ) {
196+ try {
197+ const cachedEvents = this . eventCache . getEvents ( ) ;
198+ const allEvents = [ ...cachedEvents , ...( events ?? [ ] ) ] ;
199+ if ( allEvents . length <= 0 ) {
200+ this . flushing = false ;
201+ return ;
202+ }
203+
204+ logger . debug (
205+ LogId . telemetryEmitStart ,
206+ "telemetry" ,
207+ `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)`
208+ ) ;
209+
210+ await this . sendEvents ( this . session . apiClient , allEvents ) ;
158211 this . eventCache . clearEvents ( ) ;
159212 logger . debug (
160213 LogId . telemetryEmitSuccess ,
161214 "telemetry" ,
162215 `Sent ${ allEvents . length } events successfully: ${ JSON . stringify ( allEvents , null , 2 ) } `
163216 ) ;
164- return ;
217+ } catch ( error : unknown ) {
218+ logger . debug (
219+ LogId . telemetryEmitFailure ,
220+ "telemetry" ,
221+ `Error sending event to client: ${ error instanceof Error ? error . message : String ( error ) } `
222+ ) ;
223+ this . eventCache . appendEvents ( events ?? [ ] ) ;
224+ process . nextTick ( async ( ) => {
225+ // try again
226+ await this . flush ( ) ;
227+ } ) ;
165228 }
166229
167- logger . debug (
168- LogId . telemetryEmitFailure ,
169- "telemetry" ,
170- `Error sending event to client: ${ result . error instanceof Error ? result . error . message : String ( result . error ) } `
171- ) ;
172- this . eventCache . appendEvents ( events ) ;
230+ this . flushing = false ;
173231 }
174232
175233 /**
176234 * Attempts to send events through the provided API client
177235 */
178- private async sendEvents ( client : ApiClient , events : BaseEvent [ ] ) : Promise < EventResult > {
179- try {
180- await client . sendEvents (
181- events . map ( ( event ) => ( {
182- ...event ,
183- properties : { ...this . getCommonProperties ( ) , ...event . properties } ,
184- } ) )
185- ) ;
186- return { success : true } ;
187- } catch ( error ) {
188- return {
189- success : false ,
190- error : error instanceof Error ? error : new Error ( String ( error ) ) ,
191- } ;
192- }
236+ private async sendEvents ( client : ApiClient , events : BaseEvent [ ] ) : Promise < void > {
237+ const commonProperties = await this . getCommonProperties ( ) ;
238+
239+ await client . sendEvents (
240+ events . map ( ( event ) => ( {
241+ ...event ,
242+ properties : { ...commonProperties , ...event . properties } ,
243+ } ) )
244+ ) ;
193245 }
194246}
0 commit comments