@@ -19,8 +19,11 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
1919 private baseUrl : string ,
2020 private debug : boolean = false
2121 ) { }
22- // Add a Map to track active streams
23- private activeStreams = new Map < string , { wait : ( ) => Promise < void > } > ( ) ;
22+ // Add a Map to track active streams with their abort controllers
23+ private activeStreams = new Map <
24+ string ,
25+ { wait : ( ) => Promise < void > ; abortController : AbortController }
26+ > ( ) ;
2427
2528 reset ( ) : void {
2629 this . activeStreams . clear ( ) ;
@@ -51,6 +54,13 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
5154
5255 const parsedResponse = parseCreateStreamResponse ( version , headers ) ;
5356
57+ // Create an AbortController for this stream
58+ const abortController = new AbortController ( ) ;
59+ // Chain with user-provided signal if present
60+ const combinedSignal = options ?. signal
61+ ? AbortSignal . any ?.( [ options . signal , abortController . signal ] ) ?? abortController . signal
62+ : abortController . signal ;
63+
5464 const streamInstance =
5565 parsedResponse . version === "v1"
5666 ? new StreamsWriterV1 ( {
@@ -59,7 +69,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
5969 source : asyncIterableSource ,
6070 baseUrl : this . baseUrl ,
6171 headers : this . apiClient . getHeaders ( ) ,
62- signal : options ?. signal ,
72+ signal : combinedSignal ,
6373 version,
6474 target : "self" ,
6575 } )
@@ -68,12 +78,12 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
6878 stream : key ,
6979 accessToken : parsedResponse . accessToken ,
7080 source : asyncIterableSource ,
71- signal : options ?. signal ,
81+ signal : combinedSignal ,
7282 limiter : ( await import ( "p-limit" ) ) . default ,
7383 debug : this . debug ,
7484 } ) ;
7585
76- this . activeStreams . set ( key , streamInstance ) ;
86+ this . activeStreams . set ( key , { wait : ( ) => streamInstance . wait ( ) , abortController } ) ;
7787
7888 // Clean up when stream completes
7989 streamInstance . wait ( ) . finally ( ( ) => this . activeStreams . delete ( key ) ) ;
@@ -98,21 +108,31 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
98108
99109 const promises = Array . from ( this . activeStreams . values ( ) ) . map ( ( stream ) => stream . wait ( ) ) ;
100110
101- try {
102- await Promise . race ( [
103- Promise . allSettled ( promises ) ,
104- new Promise < void > ( ( resolve , _ ) => setTimeout ( ( ) => resolve ( ) , timeout ) ) ,
105- ] ) ;
106- } catch ( error ) {
107- console . error ( "Error waiting for streams to finish:" , error ) ;
108-
109- // If we time out, abort all remaining streams
110- for ( const [ key , promise ] of this . activeStreams . entries ( ) ) {
111- // We can add abort logic here if needed
111+ // Create a timeout promise that resolves to a special sentinel value
112+ const TIMEOUT_SENTINEL = Symbol ( "timeout" ) ;
113+ const timeoutPromise = new Promise < typeof TIMEOUT_SENTINEL > ( ( resolve ) =>
114+ setTimeout ( ( ) => resolve ( TIMEOUT_SENTINEL ) , timeout )
115+ ) ;
116+
117+ // Race between all streams completing/rejecting and the timeout
118+ const result = await Promise . race ( [ Promise . all ( promises ) , timeoutPromise ] ) ;
119+
120+ // Check if we timed out
121+ if ( result === TIMEOUT_SENTINEL ) {
122+ // Timeout occurred - abort all active streams
123+ const abortedCount = this . activeStreams . size ;
124+ for ( const [ key , streamInfo ] of this . activeStreams . entries ( ) ) {
125+ streamInfo . abortController . abort ( ) ;
112126 this . activeStreams . delete ( key ) ;
113127 }
114- throw error ;
128+
129+ throw new Error (
130+ `Timeout waiting for streams to finish after ${ timeout } ms. Aborted ${ abortedCount } active stream(s).`
131+ ) ;
115132 }
133+
134+ // If we reach here, Promise.all completed (either all resolved or one rejected)
135+ // Any rejection from Promise.all will have already propagated
116136 }
117137}
118138
0 commit comments