diff --git a/src/runtime/LocalRuntime.ts b/src/runtime/LocalRuntime.ts index d62f4bd60..e64c162ab 100644 --- a/src/runtime/LocalRuntime.ts +++ b/src/runtime/LocalRuntime.ts @@ -21,7 +21,7 @@ import { NON_INTERACTIVE_ENV_VARS } from "../constants/env"; import { EXIT_CODE_ABORTED, EXIT_CODE_TIMEOUT } from "../constants/exitCodes"; import { listLocalBranches } from "../git"; import { checkInitHookExists, getInitHookPath, createLineBufferedLoggers } from "./initHook"; -import { execAsync } from "../utils/disposableExec"; +import { execAsync, DisposableProcess } from "../utils/disposableExec"; import { getProjectName } from "../utils/runtime/helpers"; import { getErrorMessage } from "../utils/errors"; import { expandTilde } from "./tildeExpansion"; @@ -80,13 +80,20 @@ export class LocalRuntime implements Runtime { detached: true, }); + // Wrap in DisposableProcess for automatic cleanup + const disposable = new DisposableProcess(childProcess); + // Convert Node.js streams to Web Streams const stdout = Readable.toWeb(childProcess.stdout) as unknown as ReadableStream; const stderr = Readable.toWeb(childProcess.stderr) as unknown as ReadableStream; const stdin = Writable.toWeb(childProcess.stdin) as unknown as WritableStream; - // Track if we killed the process due to timeout + // No stream cleanup in DisposableProcess - streams close naturally when process exits + // bash.ts handles cleanup after waiting for exitCode + + // Track if we killed the process due to timeout or abort let timedOut = false; + let aborted = false; // Create promises for exit code and duration // Uses special exit codes (EXIT_CODE_ABORTED, EXIT_CODE_TIMEOUT) for expected error conditions @@ -95,52 +102,7 @@ export class LocalRuntime implements Runtime { // The 'close' event waits for ALL child processes (including background ones) to exit, // which causes hangs when users spawn background processes like servers. // The 'exit' event fires when the main bash process exits, which is what we want. - // - // However, stdio streams may not be fully flushed when 'exit' fires, so we need to: - // 1. Track when process exits and when streams close - // 2. Resolve immediately if streams have closed - // 3. Wait with a grace period (50ms) for streams to flush if they haven't closed yet - // 4. Force-close streams after grace period to prevent hangs - let stdoutClosed = false; - let stderrClosed = false; - let processExited = false; - let exitedCode: number | null = null; - - // Track stream closures - childProcess.stdout?.on("close", () => { - stdoutClosed = true; - tryResolve(); - }); - childProcess.stderr?.on("close", () => { - stderrClosed = true; - tryResolve(); - }); - - const tryResolve = () => { - // Only resolve if process has exited AND streams are closed - if (processExited && stdoutClosed && stderrClosed) { - finalizeExit(); - } - }; - - const finalizeExit = () => { - // Check abort first (highest priority) - if (options.abortSignal?.aborted) { - resolve(EXIT_CODE_ABORTED); - return; - } - // Check if we killed the process due to timeout - if (timedOut) { - resolve(EXIT_CODE_TIMEOUT); - return; - } - resolve(exitedCode ?? 0); - }; - childProcess.on("exit", (code) => { - processExited = true; - exitedCode = code; - // Clean up any background processes (process group cleanup) // This prevents zombie processes when scripts spawn background tasks if (childProcess.pid !== undefined) { @@ -153,20 +115,18 @@ export class LocalRuntime implements Runtime { } } - // Try to resolve immediately if streams have already closed - tryResolve(); - - // Set a grace period timer - if streams don't close within 50ms, finalize anyway - // This handles background processes that keep stdio open - setTimeout(() => { - if (!stdoutClosed || !stderrClosed) { - // Mark streams as closed and finalize without destroying them - // Destroying converted Web Streams causes errors in the conversion layer - stdoutClosed = true; - stderrClosed = true; - finalizeExit(); - } - }, 50); + // Check abort first (highest priority) + if (aborted || options.abortSignal?.aborted) { + resolve(EXIT_CODE_ABORTED); + return; + } + // Check if we killed the process due to timeout + if (timedOut) { + resolve(EXIT_CODE_TIMEOUT); + return; + } + resolve(code ?? 0); + // Cleanup runs automatically via DisposableProcess }); childProcess.on("error", (err) => { @@ -176,34 +136,36 @@ export class LocalRuntime implements Runtime { const duration = exitCode.then(() => performance.now() - startTime); - // Helper to kill entire process group (including background children) - const killProcessGroup = () => { + // Register process group cleanup with DisposableProcess + // This ensures ALL background children are killed when process exits + disposable.addCleanup(() => { if (childProcess.pid === undefined) return; try { // Kill entire process group with SIGKILL - cannot be caught/ignored process.kill(-childProcess.pid, "SIGKILL"); } catch { - // Fallback: try killing just the main process - try { - childProcess.kill("SIGKILL"); - } catch { - // Process already dead - ignore - } + // Process group already dead or doesn't exist - ignore } - }; + }); // Handle abort signal if (options.abortSignal) { - options.abortSignal.addEventListener("abort", killProcessGroup); + options.abortSignal.addEventListener("abort", () => { + aborted = true; + disposable[Symbol.dispose](); // Kill process and run cleanup + }); } // Handle timeout if (options.timeout !== undefined) { - setTimeout(() => { + const timeoutHandle = setTimeout(() => { timedOut = true; - killProcessGroup(); + disposable[Symbol.dispose](); // Kill process and run cleanup }, options.timeout * 1000); + + // Clear timeout if process exits naturally + void exitCode.finally(() => clearTimeout(timeoutHandle)); } return { stdout, stderr, stdin, exitCode, duration }; diff --git a/src/runtime/SSHRuntime.ts b/src/runtime/SSHRuntime.ts index af94e0ace..138a064cb 100644 --- a/src/runtime/SSHRuntime.ts +++ b/src/runtime/SSHRuntime.ts @@ -23,7 +23,7 @@ import { streamProcessToLogger } from "./streamProcess"; import { expandTildeForSSH, cdCommandForSSH } from "./tildeExpansion"; import { getProjectName } from "../utils/runtime/helpers"; import { getErrorMessage } from "../utils/errors"; -import { execAsync } from "../utils/disposableExec"; +import { execAsync, DisposableProcess } from "../utils/disposableExec"; import { getControlPath } from "./sshConnectionPool"; /** @@ -171,30 +171,37 @@ export class SSHRuntime implements Runtime { stdio: ["pipe", "pipe", "pipe"], }); + // Wrap in DisposableProcess for automatic cleanup + const disposable = new DisposableProcess(sshProcess); + // Convert Node.js streams to Web Streams const stdout = Readable.toWeb(sshProcess.stdout) as unknown as ReadableStream; const stderr = Readable.toWeb(sshProcess.stderr) as unknown as ReadableStream; const stdin = Writable.toWeb(sshProcess.stdin) as unknown as WritableStream; - // Track if we killed the process due to timeout + // No stream cleanup in DisposableProcess - streams close naturally when process exits + // bash.ts handles cleanup after waiting for exitCode + + // Track if we killed the process due to timeout or abort let timedOut = false; + let aborted = false; // Create promises for exit code and duration // Uses special exit codes (EXIT_CODE_ABORTED, EXIT_CODE_TIMEOUT) for expected error conditions const exitCode = new Promise((resolve, reject) => { sshProcess.on("close", (code, signal) => { // Check abort first (highest priority) - if (options.abortSignal?.aborted) { + if (aborted || options.abortSignal?.aborted) { resolve(EXIT_CODE_ABORTED); return; } // Check if we killed the process due to timeout - // Don't check signal - if we set timedOut, we timed out regardless of how process died if (timedOut) { resolve(EXIT_CODE_TIMEOUT); return; } resolve(code ?? (signal ? -1 : 0)); + // Cleanup runs automatically via DisposableProcess }); sshProcess.on("error", (err) => { @@ -206,15 +213,21 @@ export class SSHRuntime implements Runtime { // Handle abort signal if (options.abortSignal) { - options.abortSignal.addEventListener("abort", () => sshProcess.kill("SIGKILL")); + options.abortSignal.addEventListener("abort", () => { + aborted = true; + disposable[Symbol.dispose](); // Kill process and run cleanup + }); } // Handle timeout - setTimeout(() => { + const timeoutHandle = setTimeout(() => { timedOut = true; - sshProcess.kill("SIGKILL"); + disposable[Symbol.dispose](); // Kill process and run cleanup }, options.timeout * 1000); + // Clear timeout if process exits naturally + void exitCode.finally(() => clearTimeout(timeoutHandle)); + return { stdout, stderr, stdin, exitCode, duration }; } diff --git a/src/services/tools/bash.ts b/src/services/tools/bash.ts index e145494ed..1ca6bae80 100644 --- a/src/services/tools/bash.ts +++ b/src/services/tools/bash.ts @@ -1,7 +1,6 @@ import { tool } from "ai"; -import { createInterface } from "readline"; +// NOTE: We avoid readline; consume Web Streams directly to prevent race conditions import * as path from "path"; -import { Readable } from "stream"; import { BASH_DEFAULT_TIMEOUT_SECS, BASH_HARD_MAX_LINES, @@ -17,6 +16,198 @@ import type { BashToolResult } from "@/types/tools"; import type { ToolConfiguration, ToolFactory } from "@/utils/tools/tools"; import { TOOL_DEFINITIONS } from "@/utils/tools/toolDefinitions"; +/** + * Validates bash script input for common issues + * Returns error result if validation fails, null if valid + */ +function validateScript(script: string, config: ToolConfiguration): BashToolResult | null { + // Check for empty script + if (!script || script.trim().length === 0) { + return { + success: false, + error: "Script parameter is empty. This likely indicates a malformed tool call.", + exitCode: -1, + wall_duration_ms: 0, + }; + } + + // Block sleep at the beginning of commands - they waste time waiting + if (/^\s*sleep\s/.test(script)) { + return { + success: false, + error: + "do not start commands with sleep; prefer <10s sleeps in busy loops (e.g., 'while ! condition; do sleep 1; done' or 'until condition; do sleep 1; done').", + exitCode: -1, + wall_duration_ms: 0, + }; + } + + // Detect redundant cd to working directory + const cdPattern = /^\s*cd\s+['"]?([^'";&|]+)['"]?\s*[;&|]/; + const match = cdPattern.exec(script); + if (match) { + const targetPath = match[1].trim(); + const normalizedTarget = config.runtime.normalizePath(targetPath, config.cwd); + const normalizedCwd = config.runtime.normalizePath(".", config.cwd); + + if (normalizedTarget === normalizedCwd) { + return { + success: false, + error: `Redundant cd to working directory detected. The tool already runs in ${config.cwd} - no cd needed. Remove the 'cd ${targetPath}' prefix.`, + exitCode: -1, + wall_duration_ms: 0, + }; + } + } + + return null; // Valid +} + +/** + * Creates a line handler that enforces truncation limits + * Processes lines for both stdout and stderr with identical logic + */ +function createLineHandler( + lines: string[], + totalBytesRef: { current: number }, + limits: { + maxLineBytes: number; + maxFileBytes: number; + maxTotalBytes: number; + maxLines: number; + }, + state: { + displayTruncated: boolean; + fileTruncated: boolean; + }, + triggerDisplayTruncation: (reason: string) => void, + triggerFileTruncation: (reason: string) => void +): (line: string) => void { + return (line: string) => { + if (state.fileTruncated) return; + + const lineBytes = Buffer.byteLength(line, "utf-8"); + + // Check if line exceeds per-line limit (hard stop - likely corrupt data) + if (lineBytes > limits.maxLineBytes) { + triggerFileTruncation( + `Line ${lines.length + 1} exceeded per-line limit: ${lineBytes} bytes > ${limits.maxLineBytes} bytes` + ); + return; + } + + // Check file limit BEFORE adding line + const bytesAfterLine = totalBytesRef.current + lineBytes + 1; // +1 for newline + if (bytesAfterLine > limits.maxFileBytes) { + triggerFileTruncation( + `Total output would exceed file preservation limit: ${bytesAfterLine} bytes > ${limits.maxFileBytes} bytes (at line ${lines.length + 1})` + ); + return; + } + + // Collect line (even if display is truncated, keep for file) + lines.push(line); + totalBytesRef.current = bytesAfterLine; + + // Check display limits (soft stop - keep collecting for file) + if (!state.displayTruncated) { + if (totalBytesRef.current > limits.maxTotalBytes) { + triggerDisplayTruncation( + `Total output exceeded display limit: ${totalBytesRef.current} bytes > ${limits.maxTotalBytes} bytes (at line ${lines.length})` + ); + return; + } + + if (lines.length >= limits.maxLines) { + triggerDisplayTruncation( + `Line count exceeded display limit: ${lines.length} lines >= ${limits.maxLines} lines (${totalBytesRef.current} bytes read)` + ); + } + } + }; +} + +/** + * Formats the final bash tool result based on exit code and truncation state + */ +function formatResult( + exitCode: number, + lines: string[], + truncated: boolean, + overflowReason: string | null, + wall_duration_ms: number, + overflowPolicy: "tmpfile" | "truncate" +): BashToolResult { + const output = lines.join("\n"); + + // Check for special error codes from runtime + if (exitCode === EXIT_CODE_ABORTED) { + return { + success: false, + error: "Command execution was aborted", + exitCode: -1, + wall_duration_ms, + }; + } + + if (exitCode === EXIT_CODE_TIMEOUT) { + return { + success: false, + error: "Command exceeded timeout", + exitCode: -1, + wall_duration_ms, + }; + } + + // Handle truncation + if (truncated) { + const truncationInfo = { + reason: overflowReason ?? "unknown reason", + totalLines: lines.length, + }; + + if (overflowPolicy === "truncate") { + // Return all collected lines with truncation marker + if (exitCode === 0) { + return { + success: true, + output, + exitCode: 0, + wall_duration_ms, + truncated: truncationInfo, + }; + } else { + return { + success: false, + output, + exitCode, + error: `Command exited with code ${exitCode}`, + wall_duration_ms, + truncated: truncationInfo, + }; + } + } + } + + // Normal exit + if (exitCode === 0) { + return { + success: true, + output, + exitCode: 0, + wall_duration_ms, + }; + } else { + return { + success: false, + output, + exitCode, + error: `Command exited with code ${exitCode}`, + wall_duration_ms, + }; + } +} + /** * Bash execution tool factory for AI assistant * Creates a bash tool that can execute commands with a configurable timeout @@ -38,63 +229,16 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => { description: TOOL_DEFINITIONS.bash.description + "\nRuns in " + config.cwd + " - no cd needed", inputSchema: TOOL_DEFINITIONS.bash.schema, execute: async ({ script, timeout_secs }, { abortSignal }): Promise => { - // Validate script is not empty - likely indicates a malformed tool call - - if (!script || script.trim().length === 0) { - return { - success: false, - error: "Script parameter is empty. This likely indicates a malformed tool call.", - exitCode: -1, - wall_duration_ms: 0, - }; - } - - // Block sleep at the beginning of commands - they waste time waiting. Use polling loops instead. - if (/^\s*sleep\s/.test(script)) { - return { - success: false, - error: - "do not start commands with sleep; prefer <10s sleeps in busy loops (e.g., 'while ! condition; do sleep 1; done' or 'until condition; do sleep 1; done').", - exitCode: -1, - wall_duration_ms: 0, - }; - } + // Validate script input + const validationError = validateScript(script, config); + if (validationError) return validationError; - // Default timeout to 3 seconds for interactivity - // OpenAI models often don't provide timeout_secs even when marked required, - // so we make it optional with a sensible default. + // Setup execution parameters const effectiveTimeout = timeout_secs ?? BASH_DEFAULT_TIMEOUT_SECS; - const startTime = performance.now(); - let totalBytesAccumulated = 0; + const totalBytesRef = { current: 0 }; // Use ref for shared access in line handler let overflowReason: string | null = null; - - // Two-stage truncation to prevent re-running expensive commands: - // 1. Display truncation (16KB): Stop showing output to agent, but keep collecting - // 2. File truncation (100KB): Stop collecting entirely and kill the process - // This allows agents to access full output via temp file without re-running - let displayTruncated = false; // Hit 16KB display limit - let fileTruncated = false; // Hit 100KB file limit - - // Detect redundant cd to working directory - // Match patterns like: "cd /path &&", "cd /path;", "cd '/path' &&", "cd \"/path\" &&" - const cdPattern = /^\s*cd\s+['"]?([^'";&|]+)['"]?\s*[;&|]/; - const match = cdPattern.exec(script); - if (match) { - const targetPath = match[1].trim(); - // Normalize paths for comparison using runtime's path resolution - const normalizedTarget = config.runtime.normalizePath(targetPath, config.cwd); - const normalizedCwd = config.runtime.normalizePath(".", config.cwd); - - if (normalizedTarget === normalizedCwd) { - return { - success: false, - error: `Redundant cd to working directory detected. The tool already runs in ${config.cwd} - no cd needed. Remove the 'cd ${targetPath}' prefix.`, - exitCode: -1, - wall_duration_ms: 0, - }; - } - } + const truncationState = { displayTruncated: false, fileTruncated: false }; // Execute using runtime interface (works for both local and SSH) const execStream = await config.runtime.exec(script, { @@ -105,346 +249,148 @@ export const createBashTool: ToolFactory = (config: ToolConfiguration) => { abortSignal, }); - // Use a promise to wait for completion - return await new Promise((resolve, _reject) => { - const lines: string[] = []; - let truncated = false; - let exitCode: number | null = null; - let resolved = false; - - // Forward-declare teardown function that will be defined below - // eslint-disable-next-line prefer-const - let teardown: () => void; - - // Helper to resolve once - const resolveOnce = (result: BashToolResult) => { - if (!resolved) { - resolved = true; - // Clean up abort listener if present - if (abortSignal && abortListener) { - abortSignal.removeEventListener("abort", abortListener); - } - resolve(result); - } - }; - - // Set up abort signal listener - immediately resolve on abort - let abortListener: (() => void) | null = null; - if (abortSignal) { - abortListener = () => { - if (!resolved) { - // Immediately resolve with abort error to unblock AI SDK stream - // The runtime will handle killing the actual process - teardown(); - resolveOnce({ - success: false, - error: "Command execution was aborted", - exitCode: -2, - wall_duration_ms: Math.round(performance.now() - startTime), - }); - } - }; - abortSignal.addEventListener("abort", abortListener); - } + // Force-close stdin immediately - we don't need to send any input + // Use abort() instead of close() for immediate, synchronous closure + // close() is async and waits for acknowledgment, which can hang over SSH + // abort() immediately marks stream as errored and releases locks + execStream.stdin.abort().catch(() => { + /* ignore */ return; + }); - // Force-close stdin immediately - we don't need to send any input - // Use abort() instead of close() for immediate, synchronous closure - // close() is async and waits for acknowledgment, which can hang over SSH - // abort() immediately marks stream as errored and releases locks - execStream.stdin.abort().catch(() => { - // Ignore errors - stream might already be closed + // Collect output concurrently from Web Streams to avoid readline race conditions. + const lines: string[] = []; + let truncated = false; + + // Helper to trigger display truncation (stop showing to agent, keep collecting) + const triggerDisplayTruncation = (reason: string) => { + truncationState.displayTruncated = true; + truncated = true; + overflowReason = reason; + // Don't kill process yet - keep collecting up to file limit + }; + + // Helper to trigger file truncation (stop collecting, close streams) + const triggerFileTruncation = (reason: string) => { + truncationState.fileTruncated = true; + truncationState.displayTruncated = true; + truncated = true; + overflowReason = reason; + // Cancel the streams to stop the process and unblock readers + execStream.stdout.cancel().catch(() => { + /* ignore */ return; }); - - // Convert Web Streams to Node.js streams for readline - // Type mismatch between Node.js ReadableStream and Web ReadableStream - safe to cast - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any - const stdoutNodeStream = Readable.fromWeb(execStream.stdout as any); - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any - const stderrNodeStream = Readable.fromWeb(execStream.stderr as any); - - // Set up readline for both stdout and stderr to handle buffering - const stdoutReader = createInterface({ input: stdoutNodeStream }); - const stderrReader = createInterface({ input: stderrNodeStream }); - - // Track when streams end - let stdoutEnded = false; - let stderrEnded = false; - - // Forward-declare functions that will be defined below - // eslint-disable-next-line prefer-const - let tryFinalize: () => void; - // eslint-disable-next-line prefer-const - let finalize: () => void; - - // Define teardown (already declared above) - teardown = () => { - stdoutReader.close(); - stderrReader.close(); - stdoutNodeStream.destroy(); - stderrNodeStream.destroy(); - }; - - // IMPORTANT: Attach exit handler IMMEDIATELY to prevent unhandled rejection - // Handle both normal exits and special error codes (EXIT_CODE_ABORTED, EXIT_CODE_TIMEOUT) - execStream.exitCode - .then((code) => { - exitCode = code; - - // Check for special error codes from runtime - if (code === EXIT_CODE_ABORTED) { - // Aborted via AbortSignal - teardown(); - resolveOnce({ - success: false, - error: "Command execution was aborted", - exitCode: -1, - wall_duration_ms: Math.round(performance.now() - startTime), - }); - return; - } - - if (code === EXIT_CODE_TIMEOUT) { - // Exceeded timeout - teardown(); - resolveOnce({ - success: false, - error: `Command exceeded timeout of ${effectiveTimeout} seconds`, - exitCode: -1, - wall_duration_ms: Math.round(performance.now() - startTime), + execStream.stderr.cancel().catch(() => { + /* ignore */ return; + }); + }; + + // Create unified line handler for both stdout and stderr + const lineHandler = createLineHandler( + lines, + totalBytesRef, + { maxLineBytes, maxFileBytes, maxTotalBytes, maxLines }, + truncationState, + triggerDisplayTruncation, + triggerFileTruncation + ); + + // Consume a ReadableStream and emit lines to lineHandler. + // Uses TextDecoder streaming to preserve multibyte boundaries. + const consumeStream = async (stream: ReadableStream): Promise => { + const reader = stream.getReader(); + const decoder = new TextDecoder("utf-8"); + let carry = ""; + try { + while (true) { + if (truncationState.fileTruncated) { + // Stop early if we already hit hard limits + await reader.cancel().catch(() => { + /* ignore */ return; }); - return; + break; } - - // Normal exit - try to finalize if streams have already closed - tryFinalize(); - // Set a grace period - if streams don't close within 50ms, force finalize - setTimeout(() => { - if (!resolved && exitCode !== null) { - stdoutNodeStream.destroy(); - stderrNodeStream.destroy(); - stdoutEnded = true; - stderrEnded = true; - tryFinalize(); - } - }, 50); - }) - .catch((err: Error) => { - // Only actual errors (like spawn failure) should reach here now - teardown(); - resolveOnce({ - success: false, - error: `Failed to execute command: ${err.message}`, - exitCode: -1, - wall_duration_ms: Math.round(performance.now() - startTime), - }); - }); - - // Helper to trigger display truncation (stop showing to agent, keep collecting) - const triggerDisplayTruncation = (reason: string) => { - displayTruncated = true; - truncated = true; - overflowReason = reason; - // Don't kill process yet - keep collecting up to file limit - }; - - // Helper to trigger file truncation (stop collecting, close streams) - const triggerFileTruncation = (reason: string) => { - fileTruncated = true; - displayTruncated = true; - truncated = true; - overflowReason = reason; - stdoutReader.close(); - stderrReader.close(); - // Cancel the streams to stop the process - // eslint-disable-next-line @typescript-eslint/no-empty-function - execStream.stdout.cancel().catch(() => {}); - // eslint-disable-next-line @typescript-eslint/no-empty-function - execStream.stderr.cancel().catch(() => {}); - }; - - stdoutReader.on("line", (line) => { - if (!resolved && !fileTruncated) { - const lineBytes = Buffer.byteLength(line, "utf-8"); - - // Check if line exceeds per-line limit (hard stop - this is likely corrupt data) - if (lineBytes > maxLineBytes) { - triggerFileTruncation( - `Line ${lines.length + 1} exceeded per-line limit: ${lineBytes} bytes > ${maxLineBytes} bytes` - ); - return; - } - - // Check file limit BEFORE adding line to prevent overlong lines from being returned - const bytesAfterLine = totalBytesAccumulated + lineBytes + 1; // +1 for newline - if (bytesAfterLine > maxFileBytes) { - triggerFileTruncation( - `Total output would exceed file preservation limit: ${bytesAfterLine} bytes > ${maxFileBytes} bytes (at line ${lines.length + 1})` - ); - return; - } - - // Collect this line (even if display is truncated, keep for file) - lines.push(line); - totalBytesAccumulated = bytesAfterLine; - - // Check display limits (soft stop - keep collecting for file) - if (!displayTruncated) { - if (totalBytesAccumulated > maxTotalBytes) { - triggerDisplayTruncation( - `Total output exceeded display limit: ${totalBytesAccumulated} bytes > ${maxTotalBytes} bytes (at line ${lines.length})` - ); - return; - } - - if (lines.length >= maxLines) { - triggerDisplayTruncation( - `Line count exceeded display limit: ${lines.length} lines >= ${maxLines} lines (${totalBytesAccumulated} bytes read)` - ); + const { value, done } = await reader.read(); + if (done) break; + // Decode chunk (streaming keeps partial code points) + const text = decoder.decode(value, { stream: true }); + carry += text; + // Split into lines; support both \n and \r\n + let start = 0; + while (true) { + const idxN = carry.indexOf("\n", start); + const idxR = carry.indexOf("\r", start); + let nextIdx = -1; + if (idxN === -1 && idxR === -1) break; + nextIdx = idxN === -1 ? idxR : idxR === -1 ? idxN : Math.min(idxN, idxR); + const line = carry.slice(0, nextIdx).replace(/\r$/, ""); + lineHandler(line); + carry = carry.slice(nextIdx + 1); + start = 0; + if (truncationState.fileTruncated) { + await reader.cancel().catch(() => { + /* ignore */ return; + }); + break; } } + if (truncationState.fileTruncated) break; } - }); - - stderrReader.on("line", (line) => { - if (!resolved && !fileTruncated) { - const lineBytes = Buffer.byteLength(line, "utf-8"); - - // Check if line exceeds per-line limit (hard stop - this is likely corrupt data) - if (lineBytes > maxLineBytes) { - triggerFileTruncation( - `Line ${lines.length + 1} exceeded per-line limit: ${lineBytes} bytes > ${maxLineBytes} bytes` - ); - return; + } finally { + // Flush decoder for any trailing bytes and emit the last line (if any) + try { + const tail = decoder.decode(); + if (tail) carry += tail; + if (carry.length > 0 && !truncationState.fileTruncated) { + lineHandler(carry); } - - // Check file limit BEFORE adding line to prevent overlong lines from being returned - const bytesAfterLine = totalBytesAccumulated + lineBytes + 1; // +1 for newline - if (bytesAfterLine > maxFileBytes) { - triggerFileTruncation( - `Total output would exceed file preservation limit: ${bytesAfterLine} bytes > ${maxFileBytes} bytes (at line ${lines.length + 1})` - ); - return; - } - - // Collect this line (even if display is truncated, keep for file) - lines.push(line); - totalBytesAccumulated = bytesAfterLine; - - // Check display limits (soft stop - keep collecting for file) - if (!displayTruncated) { - if (totalBytesAccumulated > maxTotalBytes) { - triggerDisplayTruncation( - `Total output exceeded display limit: ${totalBytesAccumulated} bytes > ${maxTotalBytes} bytes (at line ${lines.length})` - ); - return; - } - - if (lines.length >= maxLines) { - triggerDisplayTruncation( - `Line count exceeded display limit: ${lines.length} lines >= ${maxLines} lines (${totalBytesAccumulated} bytes read)` - ); - } - } - } - }); - - // Define tryFinalize (already declared above) - tryFinalize = () => { - if (resolved) return; - // Only finalize when both streams have closed and we have an exit code - if (stdoutEnded && stderrEnded && exitCode !== null) { - finalize(); + } catch { + // ignore decoder errors on flush } - }; + } + }; - stdoutReader.on("close", () => { - stdoutEnded = true; - tryFinalize(); - }); + // Start consuming stdout and stderr concurrently + const consumeStdout = consumeStream(execStream.stdout); + const consumeStderr = consumeStream(execStream.stderr); - stderrReader.on("close", () => { - stderrEnded = true; - tryFinalize(); - }); + // Wait for process exit and stream consumption concurrently + let exitCode: number; + try { + [exitCode] = await Promise.all([execStream.exitCode, consumeStdout, consumeStderr]); + } catch (err: unknown) { + return { + success: false, + error: `Failed to execute command: ${err instanceof Error ? err.message : String(err)}`, + exitCode: -1, + wall_duration_ms: Math.round(performance.now() - startTime), + }; + } - // Define finalize (already declared above) - finalize = () => { - if (resolved) return; - - // Round to integer to preserve tokens. - const wall_duration_ms = Math.round(performance.now() - startTime); - - // Clean up readline interfaces if still open - stdoutReader.close(); - stderrReader.close(); - - // Check if this was aborted (stream cancelled) - const wasAborted = abortSignal?.aborted ?? false; - - if (wasAborted) { - resolveOnce({ - success: false, - error: "Command aborted due to stream cancellation", - exitCode: -2, - wall_duration_ms, - }); - } else if (truncated) { - // Handle overflow based on policy - const overflowPolicy = config.overflow_policy ?? "tmpfile"; - - if (overflowPolicy === "truncate") { - // Return ALL collected lines (up to the limit that triggered truncation) - // With 1MB/10K line limits, this can be thousands of lines for UI to parse - const output = lines.join("\n"); - - if (exitCode === 0 || exitCode === null) { - // Success but truncated - resolveOnce({ - success: true, - output, - exitCode: 0, - wall_duration_ms, - truncated: { - reason: overflowReason ?? "unknown reason", - totalLines: lines.length, - }, - }); - } else { - // Failed and truncated - resolveOnce({ - success: false, - output, - exitCode, - error: `Command exited with code ${exitCode}`, - wall_duration_ms, - truncated: { - reason: overflowReason ?? "unknown reason", - totalLines: lines.length, - }, - }); - } - } else { - // tmpfile policy: Save overflow output to temp file instead of returning an error - // We don't show ANY of the actual output to avoid overwhelming context. - // Instead, save it to a temp file and encourage the agent to use filtering tools. - (async () => { - try { - // Use 8 hex characters for short, memorable temp file IDs - const fileId = Math.random().toString(16).substring(2, 10); - // Write to runtime temp directory (managed by StreamManager) - // Use path.posix.join to preserve forward slashes for SSH runtime - // (config.runtimeTempDir is always a POSIX path like /home/user/.cmux-tmp/token) - const overflowPath = path.posix.join(config.runtimeTempDir, `bash-${fileId}.txt`); - const fullOutput = lines.join("\n"); - - // Use runtime.writeFile() for SSH support - const writer = config.runtime.writeFile(overflowPath, abortSignal); - const encoder = new TextEncoder(); - const writerInstance = writer.getWriter(); - await writerInstance.write(encoder.encode(fullOutput)); - await writerInstance.close(); - - const output = `[OUTPUT OVERFLOW - ${overflowReason ?? "unknown reason"}] + // Round to integer to preserve tokens + const wall_duration_ms = Math.round(performance.now() - startTime); + + // Handle tmpfile overflow policy separately (writes to file) + if (truncated && (config.overflow_policy ?? "tmpfile") === "tmpfile") { + // tmpfile policy: Save overflow output to temp file instead of returning an error + // We don't show ANY of the actual output to avoid overwhelming context. + // Instead, save it to a temp file and encourage the agent to use filtering tools. + try { + // Use 8 hex characters for short, memorable temp file IDs + const fileId = Math.random().toString(16).substring(2, 10); + // Write to runtime temp directory (managed by StreamManager) + // Use path.posix.join to preserve forward slashes for SSH runtime + // (config.runtimeTempDir is always a POSIX path like /home/user/.cmux-tmp/token) + const overflowPath = path.posix.join(config.runtimeTempDir, `bash-${fileId}.txt`); + const fullOutput = lines.join("\n"); + + // Use runtime.writeFile() for SSH support + const writer = config.runtime.writeFile(overflowPath, abortSignal); + const encoder = new TextEncoder(); + const writerInstance = writer.getWriter(); + await writerInstance.write(encoder.encode(fullOutput)); + await writerInstance.close(); + + const output = `[OUTPUT OVERFLOW - ${overflowReason ?? "unknown reason"}] Full output (${lines.length} lines) saved to ${overflowPath} @@ -452,57 +398,32 @@ Use selective filtering tools (e.g. grep) to extract relevant information and co File will be automatically cleaned up when stream ends.`; - resolveOnce({ - success: false, - error: output, - exitCode: -1, - wall_duration_ms, - }); - } catch (err) { - // If temp file creation fails, fall back to original error - resolveOnce({ - success: false, - error: `Command output overflow: ${overflowReason ?? "unknown reason"}. Failed to save overflow to temp file: ${String(err)}`, - exitCode: -1, - wall_duration_ms, - }); - } - })(); - } - } else if (exitCode === EXIT_CODE_TIMEOUT) { - // Timeout - special exit code from runtime - resolveOnce({ - success: false, - error: `Command exceeded timeout of ${effectiveTimeout} seconds`, - exitCode: -1, - wall_duration_ms, - }); - } else if (exitCode === EXIT_CODE_ABORTED) { - // Aborted - special exit code from runtime - resolveOnce({ - success: false, - error: "Command execution was aborted", - exitCode: -1, - wall_duration_ms, - }); - } else if (exitCode === 0 || exitCode === null) { - resolveOnce({ - success: true, - output: lines.join("\n"), - exitCode: 0, - wall_duration_ms, - }); - } else { - resolveOnce({ - success: false, - output: lines.join("\n"), - exitCode, - error: `Command exited with code ${exitCode}`, - wall_duration_ms, - }); - } - }; - }); + return { + success: false, + error: output, + exitCode: -1, + wall_duration_ms, + }; + } catch (err) { + // If temp file creation fails, fall back to original error + return { + success: false, + error: `Command output overflow: ${overflowReason ?? "unknown reason"}. Failed to save overflow to temp file: ${String(err)}`, + exitCode: -1, + wall_duration_ms, + }; + } + } + + // Format result based on exit code and truncation state + return formatResult( + exitCode, + lines, + truncated, + overflowReason, + wall_duration_ms, + config.overflow_policy ?? "tmpfile" + ); }, }); }; diff --git a/src/utils/disposableExec.ts b/src/utils/disposableExec.ts index 21e43f9ce..0581c32a4 100644 --- a/src/utils/disposableExec.ts +++ b/src/utils/disposableExec.ts @@ -1,6 +1,90 @@ import { exec } from "child_process"; import type { ChildProcess } from "child_process"; +/** + * Disposable wrapper for child processes that ensures immediate cleanup. + * Implements TypeScript's explicit resource management (using) for process lifecycle. + * + * All registered cleanup callbacks execute immediately when disposed, either: + * - Explicitly via Symbol.dispose + * - Automatically when exiting a `using` block + * - On process exit + * + * Usage: + * const process = spawn("command"); + * const disposable = new DisposableProcess(process); + * disposable.addCleanup(() => stream.destroy()); + * // Cleanup runs automatically on process exit + */ +export class DisposableProcess implements Disposable { + private cleanupCallbacks: Array<() => void> = []; + private disposed = false; + + constructor(private readonly process: ChildProcess) { + // No auto-cleanup - callers explicitly dispose via timeout/abort handlers + // Process streams close naturally when process exits + } + + /** + * Register cleanup callback to run when process is disposed. + * If already disposed, runs immediately. + */ + addCleanup(callback: () => void): void { + if (this.disposed) { + // Already disposed, run immediately + try { + callback(); + } catch { + // Ignore errors during cleanup + } + } else { + this.cleanupCallbacks.push(callback); + } + } + + /** + * Get the underlying child process + */ + get underlying(): ChildProcess { + return this.process; + } + + /** + * Cleanup: kill process + run all cleanup callbacks immediately. + * Safe to call multiple times (idempotent). + */ + [Symbol.dispose](): void { + if (this.disposed) return; + this.disposed = true; + + // Kill process if still running + // Check both exitCode and signalCode to avoid calling kill() on already-dead processes + // When a process exits via signal (e.g., segfault, kill $$), exitCode is null but signalCode is set + if ( + !this.process.killed && + this.process.exitCode === null && + this.process.signalCode === null + ) { + try { + this.process.kill("SIGKILL"); + } catch { + // Ignore ESRCH errors - process may have exited between check and kill + } + } + + // Run all cleanup callbacks + for (const callback of this.cleanupCallbacks) { + try { + callback(); + } catch { + // Ignore cleanup errors - we're tearing down anyway + } + } + + this.cleanupCallbacks = []; + } +} + /** * Disposable wrapper for exec that ensures child process cleanup. * Prevents zombie processes by killing child when scope exits. diff --git a/tests/ipcMain/runtimeExecuteBash.test.ts b/tests/ipcMain/runtimeExecuteBash.test.ts index 506db679b..4bd8464dc 100644 --- a/tests/ipcMain/runtimeExecuteBash.test.ts +++ b/tests/ipcMain/runtimeExecuteBash.test.ts @@ -286,6 +286,81 @@ describeIntegration("Runtime Bash Execution", () => { }, type === "ssh" ? TEST_TIMEOUT_SSH_MS : TEST_TIMEOUT_LOCAL_MS ); + + test.concurrent( + "should not hang on grep | head pattern over SSH", + async () => { + const env = await createTestEnvironment(); + const tempGitRepo = await createTempGitRepo(); + + try { + // Setup provider + await setupProviders(env.mockIpcRenderer, { + anthropic: { + apiKey: getApiKey("ANTHROPIC_API_KEY"), + }, + }); + + // Create workspace + const branchName = generateBranchName("bash-grep-head"); + const runtimeConfig = getRuntimeConfig(branchName); + const { workspaceId, cleanup } = await createWorkspaceWithInit( + env, + tempGitRepo, + branchName, + runtimeConfig, + true, // waitForInit + type === "ssh" + ); + + try { + // Create some test files to search through + await sendMessageAndWait( + env, + workspaceId, + 'Run bash: for i in {1..1000}; do echo "terminal bench line $i" >> testfile.txt; done', + HAIKU_MODEL, + BASH_ONLY + ); + + // Test grep | head pattern - this historically hangs over SSH + // This is a regression test for the bash hang issue + const startTime = Date.now(); + const events = await sendMessageAndWait( + env, + workspaceId, + 'Run bash: grep -n "terminal bench" testfile.txt | head -n 200', + HAIKU_MODEL, + BASH_ONLY, + 15000 // 15s timeout - should complete quickly + ); + const duration = Date.now() - startTime; + + // Extract response text + const responseText = extractTextFromEvents(events); + + // Verify command completed successfully (not timeout) + expect(responseText).toContain("terminal bench"); + + // Verify command completed quickly (not hanging until timeout) + // SSH runtime should complete in <10s even with high latency + const maxDuration = 15000; + expect(duration).toBeLessThan(maxDuration); + + // Verify bash tool was called + const toolCallStarts = events.filter((e: any) => e.type === "tool-call-start"); + const bashCalls = toolCallStarts.filter((e: any) => e.toolName === "bash"); + expect(bashCalls.length).toBeGreaterThan(0); + } finally { + await cleanup(); + } + } finally { + await cleanupTempGitRepo(tempGitRepo); + await cleanupTestEnvironment(env); + } + }, + type === "ssh" ? TEST_TIMEOUT_SSH_MS : TEST_TIMEOUT_LOCAL_MS + ); } ); });