From 917cea8fde3f3256c01c14e2d45dd595ae14b806 Mon Sep 17 00:00:00 2001 From: Adam Stein Date: Sat, 8 Nov 2025 22:16:27 -0500 Subject: [PATCH] Add a timeout argument to PythonInterpreter --- dspy/primitives/python_interpreter.py | 38 ++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/dspy/primitives/python_interpreter.py b/dspy/primitives/python_interpreter.py index 66eb126036..5dee168ca6 100644 --- a/dspy/primitives/python_interpreter.py +++ b/dspy/primitives/python_interpreter.py @@ -1,7 +1,9 @@ import json import os import subprocess +import threading from os import PathLike +from queue import Empty, Queue from types import TracebackType from typing import Any @@ -33,6 +35,7 @@ def __init__( enable_env_vars: list[str] | None = None, enable_network_access: list[str] | None = None, sync_files: bool = True, + timeout: float | None = None, ) -> None: """ Args: @@ -76,6 +79,7 @@ def __init__( self.deno_process = None self._mounted_files = False + self.timeout = timeout def _get_runner_path(self) -> str: current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -187,7 +191,7 @@ def execute( self.deno_process.stdin.flush() # Read one JSON line from stdout - output_line = self.deno_process.stdout.readline().strip() + output_line = self._read_output_line(self.timeout) if not output_line: # Possibly the subprocess died or gave no output err_output = self.deno_process.stderr.read() @@ -217,6 +221,38 @@ def execute( self._sync_files() return result.get("output", None) + def _read_output_line(self, timeout: float | None) -> str: + if timeout is None: + return self.deno_process.stdout.readline().strip() + + queue: Queue[str | Exception] = Queue(maxsize=1) + + def _reader(): + try: + line = self.deno_process.stdout.readline() + except Exception as exc: + queue.put(exc) + else: + queue.put(line) + + reader = threading.Thread(target=_reader, daemon=True) + reader.start() + try: + result = queue.get(timeout=timeout) + except Empty: + self._handle_timeout() + raise InterpreterError(f"PythonInterpreter execution exceeded {timeout} seconds.") from None + + if isinstance(result, Exception): + raise result + return result.strip() + + def _handle_timeout(self) -> None: + if self.deno_process and self.deno_process.poll() is None: + self.deno_process.kill() + self.deno_process.wait() + self.deno_process = None + def __enter__(self): return self