Skip to content

Commit 1afb0eb

Browse files
committed
🤖 perf: remove 10ms wait by consuming Web Streams concurrently
- Replace readline + Node streams with direct Web Streams readers - Read stdout/stderr in parallel; await exitCode + drains via Promise.all - Deterministic line splitting with TextDecoder streaming (handles UTF-8 boundaries) - Early cancel on hard limits; no arbitrary sleeps - Simplifies code and resolves SSH close-event race _Generated with `cmux`_
1 parent bd86eb1 commit 1afb0eb

File tree

1 file changed

+66
-44
lines changed

1 file changed

+66
-44
lines changed

src/services/tools/bash.ts

Lines changed: 66 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { tool } from "ai";
2-
import { createInterface } from "readline";
2+
// NOTE: We avoid readline; consume Web Streams directly to prevent race conditions
33
import * as path from "path";
4-
import { Readable } from "stream";
54
import {
65
BASH_DEFAULT_TIMEOUT_SECS,
76
BASH_HARD_MAX_LINES,
@@ -254,22 +253,9 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => {
254253
// Use abort() instead of close() for immediate, synchronous closure
255254
// close() is async and waits for acknowledgment, which can hang over SSH
256255
// abort() immediately marks stream as errored and releases locks
257-
execStream.stdin.abort().catch(() => {
258-
// Ignore errors - stream might already be closed
259-
});
260-
261-
// Convert Web Streams to Node.js streams for readline
262-
// Type mismatch between Node.js ReadableStream and Web ReadableStream - safe to cast
263-
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any
264-
const stdoutNodeStream = Readable.fromWeb(execStream.stdout as any);
265-
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any
266-
const stderrNodeStream = Readable.fromWeb(execStream.stderr as any);
267-
268-
// Set up readline for both stdout and stderr to handle buffering
269-
const stdoutReader = createInterface({ input: stdoutNodeStream });
270-
const stderrReader = createInterface({ input: stderrNodeStream });
256+
execStream.stdin.abort().catch(() => { /* ignore */ return; });
271257

272-
// Collect output
258+
// Collect output concurrently from Web Streams to avoid readline race conditions.
273259
const lines: string[] = [];
274260
let truncated = false;
275261

@@ -287,13 +273,9 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => {
287273
truncationState.displayTruncated = true;
288274
truncated = true;
289275
overflowReason = reason;
290-
stdoutReader.close();
291-
stderrReader.close();
292-
// Cancel the streams to stop the process
293-
// eslint-disable-next-line @typescript-eslint/no-empty-function
294-
execStream.stdout.cancel().catch(() => {});
295-
// eslint-disable-next-line @typescript-eslint/no-empty-function
296-
execStream.stderr.cancel().catch(() => {});
276+
// Cancel the streams to stop the process and unblock readers
277+
execStream.stdout.cancel().catch(() => { /* ignore */ return; });
278+
execStream.stderr.cancel().catch(() => { /* ignore */ return; });
297279
};
298280

299281
// Create unified line handler for both stdout and stderr
@@ -306,20 +288,70 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => {
306288
triggerFileTruncation
307289
);
308290

309-
stdoutReader.on("line", lineHandler);
310-
stderrReader.on("line", lineHandler);
291+
// Consume a ReadableStream<Uint8Array> and emit lines to lineHandler.
292+
// Uses TextDecoder streaming to preserve multibyte boundaries.
293+
const consumeStream = async (stream: ReadableStream<Uint8Array>): Promise<void> => {
294+
const reader = stream.getReader();
295+
const decoder = new TextDecoder("utf-8");
296+
let carry = "";
297+
try {
298+
while (true) {
299+
if (truncationState.fileTruncated) {
300+
// Stop early if we already hit hard limits
301+
await reader.cancel().catch(() => { /* ignore */ return; });
302+
break;
303+
}
304+
const { value, done } = await reader.read();
305+
if (done) break;
306+
// Decode chunk (streaming keeps partial code points)
307+
const text = decoder.decode(value, { stream: true });
308+
carry += text;
309+
// Split into lines; support both \n and \r\n
310+
let start = 0;
311+
while (true) {
312+
const idxN = carry.indexOf("\n", start);
313+
const idxR = carry.indexOf("\r", start);
314+
let nextIdx = -1;
315+
if (idxN === -1 && idxR === -1) break;
316+
nextIdx = idxN === -1 ? idxR : idxR === -1 ? idxN : Math.min(idxN, idxR);
317+
const line = carry.slice(0, nextIdx).replace(/\r$/, "");
318+
lineHandler(line);
319+
carry = carry.slice(nextIdx + 1);
320+
start = 0;
321+
if (truncationState.fileTruncated) {
322+
await reader.cancel().catch(() => { /* ignore */ return; });
323+
break;
324+
}
325+
}
326+
if (truncationState.fileTruncated) break;
327+
}
328+
} finally {
329+
// Flush decoder for any trailing bytes and emit the last line (if any)
330+
try {
331+
const tail = decoder.decode();
332+
if (tail) carry += tail;
333+
if (carry.length > 0 && !truncationState.fileTruncated) {
334+
lineHandler(carry);
335+
}
336+
} catch {
337+
// ignore decoder errors on flush
338+
}
339+
}
340+
};
341+
342+
// Start consuming stdout and stderr concurrently
343+
const consumeStdout = consumeStream(execStream.stdout);
344+
const consumeStderr = consumeStream(execStream.stderr);
311345

312-
// Wait for process to exit
346+
// Wait for process exit and stream consumption concurrently
313347
let exitCode: number;
314348
try {
315-
exitCode = await execStream.exitCode;
349+
[exitCode] = await Promise.all([
350+
execStream.exitCode,
351+
consumeStdout,
352+
consumeStderr,
353+
]);
316354
} catch (err: unknown) {
317-
// Cleanup immediately
318-
stdoutReader.close();
319-
stderrReader.close();
320-
stdoutNodeStream.destroy();
321-
stderrNodeStream.destroy();
322-
323355
return {
324356
success: false,
325357
error: `Failed to execute command: ${err instanceof Error ? err.message : String(err)}`,
@@ -328,16 +360,6 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => {
328360
};
329361
}
330362

331-
// Give readline interfaces a moment to process final buffered data
332-
// Process has exited but readline may still be processing buffered chunks
333-
await new Promise((resolve) => setTimeout(resolve, 10));
334-
335-
// Now cleanup
336-
stdoutReader.close();
337-
stderrReader.close();
338-
stdoutNodeStream.destroy();
339-
stderrNodeStream.destroy();
340-
341363
// Round to integer to preserve tokens
342364
const wall_duration_ms = Math.round(performance.now() - startTime);
343365

0 commit comments

Comments
 (0)