Skip to content

Commit b10dff9

Browse files
committed
properly abort streams when the waitUntil timeout occurs
1 parent a9893f0 commit b10dff9

File tree

2 files changed

+49
-31
lines changed

2 files changed

+49
-31
lines changed

packages/core/src/v3/realtimeStreams/manager.ts

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
1919
private baseUrl: string,
2020
private debug: boolean = false
2121
) {}
22-
// Add a Map to track active streams
23-
private activeStreams = new Map<string, { wait: () => Promise<void> }>();
22+
// Add a Map to track active streams with their abort controllers
23+
private activeStreams = new Map<
24+
string,
25+
{ wait: () => Promise<void>; abortController: AbortController }
26+
>();
2427

2528
reset(): void {
2629
this.activeStreams.clear();
@@ -51,6 +54,13 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
5154

5255
const parsedResponse = parseCreateStreamResponse(version, headers);
5356

57+
// Create an AbortController for this stream
58+
const abortController = new AbortController();
59+
// Chain with user-provided signal if present
60+
const combinedSignal = options?.signal
61+
? AbortSignal.any?.([options.signal, abortController.signal]) ?? abortController.signal
62+
: abortController.signal;
63+
5464
const streamInstance =
5565
parsedResponse.version === "v1"
5666
? new StreamsWriterV1({
@@ -59,7 +69,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
5969
source: asyncIterableSource,
6070
baseUrl: this.baseUrl,
6171
headers: this.apiClient.getHeaders(),
62-
signal: options?.signal,
72+
signal: combinedSignal,
6373
version,
6474
target: "self",
6575
})
@@ -68,12 +78,12 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
6878
stream: key,
6979
accessToken: parsedResponse.accessToken,
7080
source: asyncIterableSource,
71-
signal: options?.signal,
81+
signal: combinedSignal,
7282
limiter: (await import("p-limit")).default,
7383
debug: this.debug,
7484
});
7585

76-
this.activeStreams.set(key, streamInstance);
86+
this.activeStreams.set(key, { wait: () => streamInstance.wait(), abortController });
7787

7888
// Clean up when stream completes
7989
streamInstance.wait().finally(() => this.activeStreams.delete(key));
@@ -98,21 +108,31 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
98108

99109
const promises = Array.from(this.activeStreams.values()).map((stream) => stream.wait());
100110

101-
try {
102-
await Promise.race([
103-
Promise.allSettled(promises),
104-
new Promise<void>((resolve, _) => setTimeout(() => resolve(), timeout)),
105-
]);
106-
} catch (error) {
107-
console.error("Error waiting for streams to finish:", error);
108-
109-
// If we time out, abort all remaining streams
110-
for (const [key, promise] of this.activeStreams.entries()) {
111-
// We can add abort logic here if needed
111+
// Create a timeout promise that resolves to a special sentinel value
112+
const TIMEOUT_SENTINEL = Symbol("timeout");
113+
const timeoutPromise = new Promise<typeof TIMEOUT_SENTINEL>((resolve) =>
114+
setTimeout(() => resolve(TIMEOUT_SENTINEL), timeout)
115+
);
116+
117+
// Race between all streams completing/rejecting and the timeout
118+
const result = await Promise.race([Promise.all(promises), timeoutPromise]);
119+
120+
// Check if we timed out
121+
if (result === TIMEOUT_SENTINEL) {
122+
// Timeout occurred - abort all active streams
123+
const abortedCount = this.activeStreams.size;
124+
for (const [key, streamInfo] of this.activeStreams.entries()) {
125+
streamInfo.abortController.abort();
112126
this.activeStreams.delete(key);
113127
}
114-
throw error;
128+
129+
throw new Error(
130+
`Timeout waiting for streams to finish after ${timeout}ms. Aborted ${abortedCount} active stream(s).`
131+
);
115132
}
133+
134+
// If we reach here, Promise.all completed (either all resolved or one rejected)
135+
// Any rejection from Promise.all will have already propagated
116136
}
117137
}
118138

references/realtime-streams/src/trigger/streams.ts

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -117,26 +117,24 @@ export const streamsTask = task({
117117

118118
await setTimeout(1000);
119119

120-
const stream = await streams.read(ctx.run.id, "stream", {
121-
timeoutInSeconds: 10,
122-
startIndex: 10,
123-
});
124-
125-
let tokenCount = 0;
126-
for await (const chunk of stream) {
127-
console.log(chunk);
128-
tokenCount++;
129-
}
120+
// const stream = await streams.read(ctx.run.id, "stream", {
121+
// timeoutInSeconds: 10,
122+
// startIndex: 10,
123+
// });
124+
125+
// let tokenCount = 0;
126+
// for await (const chunk of stream) {
127+
// console.log(chunk);
128+
// tokenCount++;
129+
// }
130130

131-
await waitUntilComplete();
131+
// await waitUntilComplete();
132132

133-
logger.info("Stream completed", { scenario, tokenCount });
133+
logger.info("Stream completed", { scenario });
134134

135135
return {
136136
scenario,
137137
scenarioDescription,
138-
tokenCount,
139-
message: `Completed ${scenario} scenario with ${tokenCount} tokens`,
140138
};
141139
},
142140
});

0 commit comments

Comments
 (0)