Skip to content

Commit 849550e

Browse files
committed
Make the MetadataStream client more robust to failure and add tests
1 parent a12684d commit 849550e

File tree

2 files changed

+1071
-18
lines changed

2 files changed

+1071
-18
lines changed

packages/core/src/v3/runMetadata/metadataStream.ts

Lines changed: 93 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ export class MetadataStream<T> {
122122
});
123123

124124
req.on("error", async (error) => {
125+
const errorCode = "code" in error ? error.code : undefined;
126+
const errorMsg = error instanceof Error ? error.message : String(error);
127+
125128
// Check if this is a retryable connection error
126129
if (this.isRetryableError(error)) {
127130
if (this.retryCount < this.maxRetries) {
@@ -144,28 +147,56 @@ export class MetadataStream<T> {
144147
reject(error);
145148
});
146149

147-
req.on("timeout", () => {
148-
req.destroy(new Error("Request timed out"));
150+
req.on("timeout", async () => {
151+
// Timeout is retryable
152+
if (this.retryCount < this.maxRetries) {
153+
this.retryCount++;
154+
const delayMs = this.calculateBackoffDelay();
155+
156+
await this.delay(delayMs);
157+
158+
// Query server to find where to resume
159+
const serverLastChunk = await this.queryServerLastChunkIndex();
160+
const resumeFromChunk = serverLastChunk + 1;
161+
162+
resolve(this.makeRequest(resumeFromChunk));
163+
return;
164+
}
165+
166+
reject(new Error("Request timed out"));
149167
});
150168

151-
req.on("response", (res) => {
152-
if (res.statusCode === 408) {
169+
req.on("response", async (res) => {
170+
// Check for retryable status codes (408, 429, 5xx)
171+
if (res.statusCode && this.isRetryableStatusCode(res.statusCode)) {
153172
if (this.retryCount < this.maxRetries) {
154173
this.retryCount++;
155-
resolve(this.makeRequest(startFromChunk));
174+
const delayMs = this.calculateBackoffDelay();
175+
176+
await this.delay(delayMs);
177+
178+
// Query server to find where to resume (in case some data was written)
179+
const serverLastChunk = await this.queryServerLastChunkIndex();
180+
const resumeFromChunk = serverLastChunk + 1;
181+
182+
resolve(this.makeRequest(resumeFromChunk));
156183
return;
157184
}
158-
reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`));
185+
186+
reject(
187+
new Error(`Max retries (${this.maxRetries}) exceeded for status code ${res.statusCode}`)
188+
);
159189
return;
160190
}
161191

192+
// Non-retryable error status
162193
if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) {
163194
const error = new Error(`HTTP error! status: ${res.statusCode}`);
164195
reject(error);
165196
return;
166197
}
167198

168-
// Reset retry count on successful response
199+
// Success! Reset retry count
169200
this.retryCount = 0;
170201

171202
res.on("end", () => {
@@ -195,12 +226,6 @@ export class MetadataStream<T> {
195226
const stringified = JSON.stringify(chunk.data) + "\n";
196227
req.write(stringified);
197228
this.currentChunkIndex = lastSentIndex + 1;
198-
} else {
199-
// Chunk not in buffer (outside ring buffer window)
200-
// This can happen if the ring buffer size is too small
201-
console.warn(
202-
`[metadataStream] Chunk ${lastSentIndex} not in ring buffer (outside window), cannot recover`
203-
);
204229
}
205230
}
206231

@@ -259,6 +284,8 @@ export class MetadataStream<T> {
259284
"ETIMEDOUT", // Connection timed out
260285
"ENOTFOUND", // DNS lookup failed
261286
"EPIPE", // Broken pipe
287+
"EHOSTUNREACH", // Host unreachable
288+
"ENETUNREACH", // Network unreachable
262289
"socket hang up", // Socket hang up
263290
];
264291

@@ -275,6 +302,18 @@ export class MetadataStream<T> {
275302
return false;
276303
}
277304

305+
private isRetryableStatusCode(statusCode: number): boolean {
306+
// Retry on transient server errors
307+
if (statusCode === 408) return true; // Request Timeout
308+
if (statusCode === 429) return true; // Rate Limit
309+
if (statusCode === 500) return true; // Internal Server Error
310+
if (statusCode === 502) return true; // Bad Gateway
311+
if (statusCode === 503) return true; // Service Unavailable
312+
if (statusCode === 504) return true; // Gateway Timeout
313+
314+
return false;
315+
}
316+
278317
private async delay(ms: number): Promise<void> {
279318
return new Promise((resolve) => setTimeout(resolve, ms));
280319
}
@@ -314,9 +353,10 @@ export class MetadataStream<T> {
314353
return result;
315354
}
316355

317-
private async queryServerLastChunkIndex(): Promise<number> {
356+
private async queryServerLastChunkIndex(attempt: number = 0): Promise<number> {
318357
return new Promise((resolve, reject) => {
319358
const url = new URL(this.buildUrl());
359+
const maxHeadRetries = 3; // Separate retry limit for HEAD requests
320360

321361
const requestFn = url.protocol === "https:" ? httpsRequest : httpRequest;
322362
const req = requestFn({
@@ -331,17 +371,52 @@ export class MetadataStream<T> {
331371
timeout: 5000, // 5 second timeout for HEAD request
332372
});
333373

334-
req.on("error", (error) => {
335-
// Return -1 to indicate we don't know what the server has
374+
req.on("error", async (error) => {
375+
if (this.isRetryableError(error) && attempt < maxHeadRetries) {
376+
await this.delay(1000 * (attempt + 1)); // Simple linear backoff
377+
const result = await this.queryServerLastChunkIndex(attempt + 1);
378+
resolve(result);
379+
return;
380+
}
381+
382+
// Return -1 to indicate we don't know what the server has (resume from 0)
336383
resolve(-1);
337384
});
338385

339-
req.on("timeout", () => {
386+
req.on("timeout", async () => {
340387
req.destroy();
388+
389+
if (attempt < maxHeadRetries) {
390+
await this.delay(1000 * (attempt + 1));
391+
const result = await this.queryServerLastChunkIndex(attempt + 1);
392+
resolve(result);
393+
return;
394+
}
395+
341396
resolve(-1);
342397
});
343398

344-
req.on("response", (res) => {
399+
req.on("response", async (res) => {
400+
// Retry on 5xx errors
401+
if (res.statusCode && this.isRetryableStatusCode(res.statusCode)) {
402+
if (attempt < maxHeadRetries) {
403+
await this.delay(1000 * (attempt + 1));
404+
const result = await this.queryServerLastChunkIndex(attempt + 1);
405+
resolve(result);
406+
return;
407+
}
408+
409+
resolve(-1);
410+
return;
411+
}
412+
413+
// Non-retryable error
414+
if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) {
415+
resolve(-1);
416+
return;
417+
}
418+
419+
// Success - extract chunk index
345420
const lastChunkHeader = res.headers["x-last-chunk-index"];
346421
if (lastChunkHeader) {
347422
const lastChunkIndex = parseInt(

0 commit comments

Comments
 (0)