Skip to content

Commit 8f93d86

Browse files
committed
prevent memory leaks by cleaning up responses and requests
1 parent 377dbc7 commit 8f93d86

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

packages/core/src/v3/realtimeStreams/streamsWriterV1.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ export class StreamsWriterV1<T> implements StreamsWriter {
131131
if (this.isRetryableError(error)) {
132132
if (this.retryCount < this.maxRetries) {
133133
this.retryCount++;
134+
135+
// Clean up the current request to avoid socket leaks
136+
req.destroy();
137+
134138
const delayMs = this.calculateBackoffDelay();
135139

136140
await this.delay(delayMs);
@@ -153,6 +157,10 @@ export class StreamsWriterV1<T> implements StreamsWriter {
153157
// Timeout is retryable
154158
if (this.retryCount < this.maxRetries) {
155159
this.retryCount++;
160+
161+
// Clean up the current request to avoid socket leaks
162+
req.destroy();
163+
156164
const delayMs = this.calculateBackoffDelay();
157165

158166
await this.delay(delayMs);
@@ -165,6 +173,7 @@ export class StreamsWriterV1<T> implements StreamsWriter {
165173
return;
166174
}
167175

176+
req.destroy();
168177
reject(new Error("Request timed out"));
169178
});
170179

@@ -173,6 +182,13 @@ export class StreamsWriterV1<T> implements StreamsWriter {
173182
if (res.statusCode && this.isRetryableStatusCode(res.statusCode)) {
174183
if (this.retryCount < this.maxRetries) {
175184
this.retryCount++;
185+
186+
// Drain and destroy the response and request to avoid socket leaks
187+
// We need to consume the response before destroying it
188+
res.resume(); // Start draining the response
189+
res.destroy(); // Destroy the response to free the socket
190+
req.destroy(); // Destroy the request as well
191+
176192
const delayMs = this.calculateBackoffDelay();
177193

178194
await this.delay(delayMs);
@@ -185,6 +201,8 @@ export class StreamsWriterV1<T> implements StreamsWriter {
185201
return;
186202
}
187203

204+
res.destroy();
205+
req.destroy();
188206
reject(
189207
new Error(`Max retries (${this.maxRetries}) exceeded for status code ${res.statusCode}`)
190208
);
@@ -193,6 +211,8 @@ export class StreamsWriterV1<T> implements StreamsWriter {
193211

194212
// Non-retryable error status
195213
if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) {
214+
res.destroy();
215+
req.destroy();
196216
const error = new Error(`HTTP error! status: ${res.statusCode}`);
197217
reject(error);
198218
return;
@@ -369,12 +389,16 @@ export class StreamsWriterV1<T> implements StreamsWriter {
369389

370390
req.on("error", async (error) => {
371391
if (this.isRetryableError(error) && attempt < maxHeadRetries) {
392+
// Clean up the current request to avoid socket leaks
393+
req.destroy();
394+
372395
await this.delay(1000 * (attempt + 1)); // Simple linear backoff
373396
const result = await this.queryServerLastChunkIndex(attempt + 1);
374397
resolve(result);
375398
return;
376399
}
377400

401+
req.destroy();
378402
// Return -1 to indicate we don't know what the server has (resume from 0)
379403
resolve(-1);
380404
});
@@ -396,18 +420,27 @@ export class StreamsWriterV1<T> implements StreamsWriter {
396420
// Retry on 5xx errors
397421
if (res.statusCode && this.isRetryableStatusCode(res.statusCode)) {
398422
if (attempt < maxHeadRetries) {
423+
// Drain and destroy the response and request to avoid socket leaks
424+
res.resume();
425+
res.destroy();
426+
req.destroy();
427+
399428
await this.delay(1000 * (attempt + 1));
400429
const result = await this.queryServerLastChunkIndex(attempt + 1);
401430
resolve(result);
402431
return;
403432
}
404433

434+
res.destroy();
435+
req.destroy();
405436
resolve(-1);
406437
return;
407438
}
408439

409440
// Non-retryable error
410441
if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) {
442+
res.destroy();
443+
req.destroy();
411444
resolve(-1);
412445
return;
413446
}

references/realtime-streams/src/trigger/streams.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ export const streamsTask = task({
6464
break;
6565
}
6666
case "continuous": {
67-
const durationSec = payload.durationSec ?? 10;
67+
const durationSec = payload.durationSec ?? 45;
6868
const intervalMs = payload.intervalMs ?? 10;
6969
generator = generateContinuousTokenStream(durationSec, intervalMs);
7070
scenarioDescription = `Continuous scenario: ${durationSec}s with ${intervalMs}ms intervals`;

0 commit comments

Comments
 (0)