@@ -75,6 +75,7 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
7575 private retryCount = 0 ;
7676 private readonly baseDelayMs = 1000 ;
7777 private readonly maxDelayMs = 30000 ;
78+ private aborted = false ;
7879
7980 constructor ( private options : StreamsWriterV2Options < T > ) {
8081 this . limiter = options . limiter ( 1 ) ;
@@ -88,6 +89,24 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
8889 `[S2MetadataStream] Initializing: basin=${ options . basin } , stream=${ options . stream } , flushIntervalMs=${ this . flushIntervalMs } , maxRetries=${ this . maxRetries } `
8990 ) ;
9091
92+ // Check if already aborted
93+ if ( options . signal ?. aborted ) {
94+ this . aborted = true ;
95+ this . log ( "[S2MetadataStream] Signal already aborted, skipping initialization" ) ;
96+ this . serverStream = new ReadableStream < T > ( ) ;
97+ this . consumerStream = new ReadableStream < T > ( ) ;
98+ this . streamPromise = Promise . resolve ( ) ;
99+ return ;
100+ }
101+
102+ // Set up abort signal handler
103+ if ( options . signal ) {
104+ options . signal . addEventListener ( "abort" , ( ) => {
105+ this . log ( "[S2MetadataStream] Abort signal received" ) ;
106+ this . handleAbort ( ) ;
107+ } ) ;
108+ }
109+
91110 const [ serverStream , consumerStream ] = this . createTeeStreams ( ) ;
92111 this . serverStream = serverStream ;
93112 this . consumerStream = consumerStream ;
@@ -101,6 +120,43 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
101120 this . streamPromise = this . initializeServerStream ( ) ;
102121 }
103122
123+ private handleAbort ( ) : void {
124+ if ( this . aborted ) {
125+ return ; // Already aborted
126+ }
127+
128+ this . aborted = true ;
129+ this . log ( "[S2MetadataStream] Handling abort - cleaning up resources" ) ;
130+
131+ // Clear flush interval
132+ if ( this . flushInterval ) {
133+ clearInterval ( this . flushInterval ) ;
134+ this . flushInterval = null ;
135+ this . log ( "[S2MetadataStream] Cleared flush interval" ) ;
136+ }
137+
138+ // Cancel stream reader
139+ if ( this . streamReader ) {
140+ this . streamReader
141+ . cancel ( "Aborted" )
142+ . catch ( ( error ) => {
143+ this . logError ( "[S2MetadataStream] Error canceling stream reader:" , error ) ;
144+ } )
145+ . finally ( ( ) => {
146+ this . log ( "[S2MetadataStream] Stream reader canceled" ) ;
147+ } ) ;
148+ }
149+
150+ // Clear pending flushes
151+ const pendingCount = this . pendingFlushes . length ;
152+ this . pendingFlushes = [ ] ;
153+ if ( pendingCount > 0 ) {
154+ this . log ( `[S2MetadataStream] Cleared ${ pendingCount } pending flushes` ) ;
155+ }
156+
157+ this . log ( "[S2MetadataStream] Abort cleanup complete" ) ;
158+ }
159+
104160 private createTeeStreams ( ) {
105161 const readableSource = new ReadableStream < T > ( {
106162 start : async ( controller ) => {
@@ -131,6 +187,12 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
131187 let chunkCount = 0 ;
132188
133189 while ( true ) {
190+ // Check if aborted
191+ if ( this . aborted ) {
192+ this . log ( "[S2MetadataStream] Buffering stopped due to abort signal" ) ;
193+ break ;
194+ }
195+
134196 const { done, value } = await this . streamReader ! . read ( ) ;
135197
136198 if ( done ) {
@@ -139,6 +201,12 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
139201 break ;
140202 }
141203
204+ // Check again after async read
205+ if ( this . aborted ) {
206+ this . log ( "[S2MetadataStream] Buffering stopped due to abort signal" ) ;
207+ break ;
208+ }
209+
142210 // Add to pending flushes
143211 this . pendingFlushes . push ( value ) ;
144212 chunkCount ++ ;
@@ -166,6 +234,12 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
166234 }
167235
168236 private async flush ( ) : Promise < void > {
237+ // Don't flush if aborted
238+ if ( this . aborted ) {
239+ this . log ( "[S2MetadataStream] Flush skipped due to abort signal" ) ;
240+ return ;
241+ }
242+
169243 if ( this . pendingFlushes . length === 0 ) {
170244 return ;
171245 }
@@ -227,6 +301,12 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
227301 // Wait for buffer task and all flushes to complete
228302 await this . bufferReaderTask ;
229303
304+ // Skip final flush if aborted
305+ if ( this . aborted ) {
306+ this . log ( "[S2MetadataStream] Stream initialization aborted" ) ;
307+ return ;
308+ }
309+
230310 this . log (
231311 `[S2MetadataStream] Buffer task complete, performing final flush (${ this . pendingFlushes . length } pending chunks)`
232312 ) ;
0 commit comments