@@ -7,16 +7,29 @@ import { MACHINE_METADATA } from "./constants.js";
77import { EventCache } from "./eventCache.js" ;
88import { detectContainerEnv } from "../helpers/container.js" ;
99import type { DeviceId } from "../helpers/deviceId.js" ;
10+ import { EventEmitter } from "stream" ;
1011
1112type EventResult = {
1213 success : boolean ;
1314 error ?: Error ;
1415} ;
1516
17+ async function timeout ( promise : Promise < unknown > , ms : number ) : Promise < void > {
18+ await Promise . race ( [ new Promise ( ( resolve ) => setTimeout ( resolve , ms ) ) , promise ] ) ;
19+ }
20+
21+ export interface TelemetryEvents {
22+ "events-emitted" : [ ] ;
23+ "events-send-failed" : [ ] ;
24+ "events-skipped" : [ ] ;
25+ }
26+
1627export class Telemetry {
1728 private isBufferingEvents : boolean = true ;
1829 /** Resolves when the setup is complete or a timeout occurs */
1930 public setupPromise : Promise < [ string , boolean ] > | undefined ;
31+ public readonly events : EventEmitter < TelemetryEvents > = new EventEmitter ( ) ;
32+
2033 private eventCache : EventCache ;
2134 private deviceId : DeviceId ;
2235
@@ -57,6 +70,12 @@ export class Telemetry {
5770
5871 private async setup ( ) : Promise < void > {
5972 if ( ! this . isTelemetryEnabled ( ) ) {
73+ this . session . logger . info ( {
74+ id : LogId . telemetryEmitFailure ,
75+ context : "telemetry" ,
76+ message : "Telemetry is disabled." ,
77+ noRedaction : true ,
78+ } ) ;
6079 return ;
6180 }
6281
@@ -71,34 +90,22 @@ export class Telemetry {
7190
7291 public async close ( ) : Promise < void > {
7392 this . isBufferingEvents = false ;
74- await this . emitEvents ( this . eventCache . getEvents ( ) ) ;
93+ await timeout ( this . emit ( [ ] ) , 5_000 ) ;
7594 }
7695
7796 /**
7897 * Emits events through the telemetry pipeline
7998 * @param events - The events to emit
8099 */
81- public async emitEvents ( events : BaseEvent [ ] ) : Promise < void > {
82- try {
83- if ( ! this . isTelemetryEnabled ( ) ) {
84- this . session . logger . info ( {
85- id : LogId . telemetryEmitFailure ,
86- context : "telemetry" ,
87- message : "Telemetry is disabled." ,
88- noRedaction : true ,
89- } ) ;
90- return ;
91- }
92-
93- await this . emit ( events ) ;
94- } catch {
95- this . session . logger . debug ( {
96- id : LogId . telemetryEmitFailure ,
97- context : "telemetry" ,
98- message : "Error emitting telemetry events." ,
99- noRedaction : true ,
100- } ) ;
100+ public emitEvents ( events : BaseEvent [ ] ) : void {
101+ if ( ! this . isTelemetryEnabled ( ) ) {
102+ this . events . emit ( "events-skipped" ) ;
103+ return ;
101104 }
105+
106+ // Don't wait for events to be sent - we should not block regular server
107+ // operations on telemetry
108+ void this . emit ( events ) ;
102109 }
103110
104111 /**
@@ -144,32 +151,44 @@ export class Telemetry {
144151 return ;
145152 }
146153
147- const cachedEvents = this . eventCache . getEvents ( ) ;
148- const allEvents = [ ...cachedEvents , ...events ] ;
154+ try {
155+ const cachedEvents = this . eventCache . getEvents ( ) ;
156+ const allEvents = [ ...cachedEvents , ...events ] ;
149157
150- this . session . logger . debug ( {
151- id : LogId . telemetryEmitStart ,
152- context : "telemetry" ,
153- message : `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)` ,
154- } ) ;
158+ this . session . logger . debug ( {
159+ id : LogId . telemetryEmitStart ,
160+ context : "telemetry" ,
161+ message : `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)` ,
162+ } ) ;
163+
164+ const result = await this . sendEvents ( this . session . apiClient , allEvents ) ;
165+ if ( result . success ) {
166+ this . eventCache . clearEvents ( ) ;
167+ this . session . logger . debug ( {
168+ id : LogId . telemetryEmitSuccess ,
169+ context : "telemetry" ,
170+ message : `Sent ${ allEvents . length } events successfully: ${ JSON . stringify ( allEvents , null , 2 ) } ` ,
171+ } ) ;
172+ this . events . emit ( "events-emitted" ) ;
173+ return ;
174+ }
155175
156- const result = await this . sendEvents ( this . session . apiClient , allEvents ) ;
157- if ( result . success ) {
158- this . eventCache . clearEvents ( ) ;
159176 this . session . logger . debug ( {
160- id : LogId . telemetryEmitSuccess ,
177+ id : LogId . telemetryEmitFailure ,
161178 context : "telemetry" ,
162- message : `Sent ${ allEvents . length } events successfully : ${ JSON . stringify ( allEvents , null , 2 ) } ` ,
179+ message : `Error sending event to client : ${ result . error instanceof Error ? result . error . message : String ( result . error ) } ` ,
163180 } ) ;
164- return ;
181+ this . eventCache . appendEvents ( events ) ;
182+ this . events . emit ( "events-send-failed" ) ;
183+ } catch ( error ) {
184+ this . session . logger . debug ( {
185+ id : LogId . telemetryEmitFailure ,
186+ context : "telemetry" ,
187+ message : `Error emitting telemetry events: ${ error instanceof Error ? error . message : String ( error ) } ` ,
188+ noRedaction : true ,
189+ } ) ;
190+ this . events . emit ( "events-send-failed" ) ;
165191 }
166-
167- this . session . logger . debug ( {
168- id : LogId . telemetryEmitFailure ,
169- context : "telemetry" ,
170- message : `Error sending event to client: ${ result . error instanceof Error ? result . error . message : String ( result . error ) } ` ,
171- } ) ;
172- this . eventCache . appendEvents ( events ) ;
173192 }
174193
175194 /**
0 commit comments