Skip to content

Commit 442979f

Browse files
committed
s2 stream writer now handles abort signals
1 parent 440f2e7 commit 442979f

File tree

2 files changed

+81
-0
lines changed

2 files changed

+81
-0
lines changed

packages/core/src/v3/realtimeStreams/streamsWriterV2.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
7575
private retryCount = 0;
7676
private readonly baseDelayMs = 1000;
7777
private readonly maxDelayMs = 30000;
78+
private aborted = false;
7879

7980
constructor(private options: StreamsWriterV2Options<T>) {
8081
this.limiter = options.limiter(1);
@@ -88,6 +89,24 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
8889
`[S2MetadataStream] Initializing: basin=${options.basin}, stream=${options.stream}, flushIntervalMs=${this.flushIntervalMs}, maxRetries=${this.maxRetries}`
8990
);
9091

92+
// Check if already aborted
93+
if (options.signal?.aborted) {
94+
this.aborted = true;
95+
this.log("[S2MetadataStream] Signal already aborted, skipping initialization");
96+
this.serverStream = new ReadableStream<T>();
97+
this.consumerStream = new ReadableStream<T>();
98+
this.streamPromise = Promise.resolve();
99+
return;
100+
}
101+
102+
// Set up abort signal handler
103+
if (options.signal) {
104+
options.signal.addEventListener("abort", () => {
105+
this.log("[S2MetadataStream] Abort signal received");
106+
this.handleAbort();
107+
});
108+
}
109+
91110
const [serverStream, consumerStream] = this.createTeeStreams();
92111
this.serverStream = serverStream;
93112
this.consumerStream = consumerStream;
@@ -101,6 +120,43 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
101120
this.streamPromise = this.initializeServerStream();
102121
}
103122

123+
private handleAbort(): void {
124+
if (this.aborted) {
125+
return; // Already aborted
126+
}
127+
128+
this.aborted = true;
129+
this.log("[S2MetadataStream] Handling abort - cleaning up resources");
130+
131+
// Clear flush interval
132+
if (this.flushInterval) {
133+
clearInterval(this.flushInterval);
134+
this.flushInterval = null;
135+
this.log("[S2MetadataStream] Cleared flush interval");
136+
}
137+
138+
// Cancel stream reader
139+
if (this.streamReader) {
140+
this.streamReader
141+
.cancel("Aborted")
142+
.catch((error) => {
143+
this.logError("[S2MetadataStream] Error canceling stream reader:", error);
144+
})
145+
.finally(() => {
146+
this.log("[S2MetadataStream] Stream reader canceled");
147+
});
148+
}
149+
150+
// Clear pending flushes
151+
const pendingCount = this.pendingFlushes.length;
152+
this.pendingFlushes = [];
153+
if (pendingCount > 0) {
154+
this.log(`[S2MetadataStream] Cleared ${pendingCount} pending flushes`);
155+
}
156+
157+
this.log("[S2MetadataStream] Abort cleanup complete");
158+
}
159+
104160
private createTeeStreams() {
105161
const readableSource = new ReadableStream<T>({
106162
start: async (controller) => {
@@ -131,6 +187,12 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
131187
let chunkCount = 0;
132188

133189
while (true) {
190+
// Check if aborted
191+
if (this.aborted) {
192+
this.log("[S2MetadataStream] Buffering stopped due to abort signal");
193+
break;
194+
}
195+
134196
const { done, value } = await this.streamReader!.read();
135197

136198
if (done) {
@@ -139,6 +201,12 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
139201
break;
140202
}
141203

204+
// Check again after async read
205+
if (this.aborted) {
206+
this.log("[S2MetadataStream] Buffering stopped due to abort signal");
207+
break;
208+
}
209+
142210
// Add to pending flushes
143211
this.pendingFlushes.push(value);
144212
chunkCount++;
@@ -166,6 +234,12 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
166234
}
167235

168236
private async flush(): Promise<void> {
237+
// Don't flush if aborted
238+
if (this.aborted) {
239+
this.log("[S2MetadataStream] Flush skipped due to abort signal");
240+
return;
241+
}
242+
169243
if (this.pendingFlushes.length === 0) {
170244
return;
171245
}
@@ -227,6 +301,12 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
227301
// Wait for buffer task and all flushes to complete
228302
await this.bufferReaderTask;
229303

304+
// Skip final flush if aborted
305+
if (this.aborted) {
306+
this.log("[S2MetadataStream] Stream initialization aborted");
307+
return;
308+
}
309+
230310
this.log(
231311
`[S2MetadataStream] Buffer task complete, performing final flush (${this.pendingFlushes.length} pending chunks)`
232312
);

packages/trigger-sdk/src/v3/streams.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ async function append<T>(
7676
} catch (error) {
7777
// if the error is a signal abort error, we need to end the span but not record an exception
7878
if (error instanceof Error && error.name === "AbortError") {
79+
span.end();
7980
throw error;
8081
}
8182

0 commit comments

Comments
 (0)