Skip to content

Commit 7ca7873

Browse files
committed
🤖 perf: remove 10ms wait by consuming Web Streams concurrently
- Replace readline + Node streams with direct Web Streams readers - Read stdout/stderr in parallel; await exitCode + drains via Promise.all - Deterministic line splitting with TextDecoder streaming (handles UTF-8 boundaries) - Early cancel on hard limits; no arbitrary sleeps - Simplifies code and resolves SSH close-event race _Generated with `cmux`_
1 parent bd86eb1 commit 7ca7873

File tree

1 file changed

+70
-42
lines changed

1 file changed

+70
-42
lines changed

src/services/tools/bash.ts

Lines changed: 70 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { tool } from "ai";
2-
import { createInterface } from "readline";
2+
// NOTE: We avoid readline; consume Web Streams directly to prevent race conditions
33
import * as path from "path";
4-
import { Readable } from "stream";
54
import {
65
BASH_DEFAULT_TIMEOUT_SECS,
76
BASH_HARD_MAX_LINES,
@@ -255,21 +254,10 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => {
255254
// close() is async and waits for acknowledgment, which can hang over SSH
256255
// abort() immediately marks stream as errored and releases locks
257256
execStream.stdin.abort().catch(() => {
258-
// Ignore errors - stream might already be closed
257+
/* ignore */ return;
259258
});
260259

261-
// Convert Web Streams to Node.js streams for readline
262-
// Type mismatch between Node.js ReadableStream and Web ReadableStream - safe to cast
263-
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any
264-
const stdoutNodeStream = Readable.fromWeb(execStream.stdout as any);
265-
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any
266-
const stderrNodeStream = Readable.fromWeb(execStream.stderr as any);
267-
268-
// Set up readline for both stdout and stderr to handle buffering
269-
const stdoutReader = createInterface({ input: stdoutNodeStream });
270-
const stderrReader = createInterface({ input: stderrNodeStream });
271-
272-
// Collect output
260+
// Collect output concurrently from Web Streams to avoid readline race conditions.
273261
const lines: string[] = [];
274262
let truncated = false;
275263

@@ -287,13 +275,13 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => {
287275
truncationState.displayTruncated = true;
288276
truncated = true;
289277
overflowReason = reason;
290-
stdoutReader.close();
291-
stderrReader.close();
292-
// Cancel the streams to stop the process
293-
// eslint-disable-next-line @typescript-eslint/no-empty-function
294-
execStream.stdout.cancel().catch(() => {});
295-
// eslint-disable-next-line @typescript-eslint/no-empty-function
296-
execStream.stderr.cancel().catch(() => {});
278+
// Cancel the streams to stop the process and unblock readers
279+
execStream.stdout.cancel().catch(() => {
280+
/* ignore */ return;
281+
});
282+
execStream.stderr.cancel().catch(() => {
283+
/* ignore */ return;
284+
});
297285
};
298286

299287
// Create unified line handler for both stdout and stderr
@@ -306,20 +294,70 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => {
306294
triggerFileTruncation
307295
);
308296

309-
stdoutReader.on("line", lineHandler);
310-
stderrReader.on("line", lineHandler);
297+
// Consume a ReadableStream<Uint8Array> and emit lines to lineHandler.
298+
// Uses TextDecoder streaming to preserve multibyte boundaries.
299+
const consumeStream = async (stream: ReadableStream<Uint8Array>): Promise<void> => {
300+
const reader = stream.getReader();
301+
const decoder = new TextDecoder("utf-8");
302+
let carry = "";
303+
try {
304+
while (true) {
305+
if (truncationState.fileTruncated) {
306+
// Stop early if we already hit hard limits
307+
await reader.cancel().catch(() => {
308+
/* ignore */ return;
309+
});
310+
break;
311+
}
312+
const { value, done } = await reader.read();
313+
if (done) break;
314+
// Decode chunk (streaming keeps partial code points)
315+
const text = decoder.decode(value, { stream: true });
316+
carry += text;
317+
// Split into lines; support both \n and \r\n
318+
let start = 0;
319+
while (true) {
320+
const idxN = carry.indexOf("\n", start);
321+
const idxR = carry.indexOf("\r", start);
322+
let nextIdx = -1;
323+
if (idxN === -1 && idxR === -1) break;
324+
nextIdx = idxN === -1 ? idxR : idxR === -1 ? idxN : Math.min(idxN, idxR);
325+
const line = carry.slice(0, nextIdx).replace(/\r$/, "");
326+
lineHandler(line);
327+
carry = carry.slice(nextIdx + 1);
328+
start = 0;
329+
if (truncationState.fileTruncated) {
330+
await reader.cancel().catch(() => {
331+
/* ignore */ return;
332+
});
333+
break;
334+
}
335+
}
336+
if (truncationState.fileTruncated) break;
337+
}
338+
} finally {
339+
// Flush decoder for any trailing bytes and emit the last line (if any)
340+
try {
341+
const tail = decoder.decode();
342+
if (tail) carry += tail;
343+
if (carry.length > 0 && !truncationState.fileTruncated) {
344+
lineHandler(carry);
345+
}
346+
} catch {
347+
// ignore decoder errors on flush
348+
}
349+
}
350+
};
311351

312-
// Wait for process to exit
352+
// Start consuming stdout and stderr concurrently
353+
const consumeStdout = consumeStream(execStream.stdout);
354+
const consumeStderr = consumeStream(execStream.stderr);
355+
356+
// Wait for process exit and stream consumption concurrently
313357
let exitCode: number;
314358
try {
315-
exitCode = await execStream.exitCode;
359+
[exitCode] = await Promise.all([execStream.exitCode, consumeStdout, consumeStderr]);
316360
} catch (err: unknown) {
317-
// Cleanup immediately
318-
stdoutReader.close();
319-
stderrReader.close();
320-
stdoutNodeStream.destroy();
321-
stderrNodeStream.destroy();
322-
323361
return {
324362
success: false,
325363
error: `Failed to execute command: ${err instanceof Error ? err.message : String(err)}`,
@@ -328,16 +366,6 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => {
328366
};
329367
}
330368

331-
// Give readline interfaces a moment to process final buffered data
332-
// Process has exited but readline may still be processing buffered chunks
333-
await new Promise((resolve) => setTimeout(resolve, 10));
334-
335-
// Now cleanup
336-
stdoutReader.close();
337-
stderrReader.close();
338-
stdoutNodeStream.destroy();
339-
stderrNodeStream.destroy();
340-
341369
// Round to integer to preserve tokens
342370
const wall_duration_ms = Math.round(performance.now() - startTime);
343371

0 commit comments

Comments
 (0)