From 3b11f6f30c38471e92cb491b6d5882ac242a4588 Mon Sep 17 00:00:00 2001 From: John-Mason Shackelford Date: Thu, 6 Nov 2025 16:59:28 -0500 Subject: [PATCH 1/2] docs(examples): demonstrate advanced visualizer patterns with metrics tracking - Show how to build stateful visualizers with incremental metrics display - Demonstrate tracking LLM costs and token usage from conversation stats - Teach handling multiple event types (ActionEvent, MessageEvent, AgentErrorEvent) - Example extracting command and path details from tool action events --- .../01_standalone_sdk/26_custom_visualizer.py | 346 ++++++++++++++++-- 1 file changed, 306 insertions(+), 40 deletions(-) diff --git a/examples/01_standalone_sdk/26_custom_visualizer.py b/examples/01_standalone_sdk/26_custom_visualizer.py index c6aed6884a..c5cf7cac2c 100644 --- a/examples/01_standalone_sdk/26_custom_visualizer.py +++ b/examples/01_standalone_sdk/26_custom_visualizer.py @@ -6,72 +6,338 @@ - Direct configuration (just pass the visualizer instance to visualizer parameter) - Reusable visualizer that can be shared across conversations +The MinimalProgressVisualizer produces concise output showing: +- LLM call completions with cost and token information +- Tool execution steps with command/path details +- Agent thinking indicators +- Error messages + This demonstrates how you can pass a ConversationVisualizer instance directly to the visualizer parameter for clean, reusable visualization logic. """ import logging import os +from collections.abc import Callable from pydantic import SecretStr from openhands.sdk import LLM, Conversation from openhands.sdk.conversation.visualizer import ConversationVisualizerBase from openhands.sdk.event import ( + ActionEvent, + AgentErrorEvent, Event, + MessageEvent, + ObservationEvent, ) from openhands.tools.preset.default import get_default_agent -class MinimalVisualizer(ConversationVisualizerBase): - """A minimal visualizer that print the raw events as they occur.""" +def handles(event_type: type[Event]): + """Decorator to register a method as an event handler.""" + + def decorator(func): + func._handles_event_type = event_type + return func + + return decorator + + +class EventHandlerMixin: + """Mixin that provides event handler registration via decorators.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._event_handlers: dict[type[Event], Callable[[Event], None]] = {} + self._register_handlers() + + def _register_handlers(self): + """Automatically discover and register event handlers.""" + for attr_name in dir(self): + attr = getattr(self, attr_name) + if hasattr(attr, "_handles_event_type"): + event_type = attr._handles_event_type + self._event_handlers[event_type] = attr + + def on_event(self, event: Event) -> None: + """Dispatch events to registered handlers.""" + event_type = type(event) + handler = self._event_handlers.get(event_type) + if handler: + handler(event) + # Optionally handle unknown events - subclasses can override this + else: + self._handle_unknown_event(event) + + def _handle_unknown_event(self, event: Event) -> None: + """Handle unknown event types. Override in subclasses if needed.""" + # Default: do nothing for unknown events + pass + + +class MinimalProgressVisualizer(EventHandlerMixin, ConversationVisualizerBase): + """A minimal progress visualizer that shows step counts and tool names. + + This visualizer produces concise output showing: + - LLM call completions with cost and token information + - Tool execution steps with command/path details + - Agent thinking indicators + - Error messages + + Example output: + 🤖 LLM call completed (cost: $0.001234, tokens: prompt=100, + completion=50, total=150) + Step 1: Executing str_replace_editor (view: .../FACTS.txt)... ✓ + 💭 Agent thinking... + 🤖 LLM call completed (cost: $0.002345, tokens: prompt=200, + completion=100, total=300) + Step 2: Executing str_replace_editor (str_replace: .../FACTS.txt)... ✓ + """ def __init__(self, name: str | None = None): """Initialize the minimal progress visualizer. Args: name: Optional name to identify the agent/conversation. + Note: This simple visualizer doesn't use it in output, + but accepts it for compatibility with the base class. """ # Initialize parent - state will be set later via initialize() super().__init__(name=name) - def on_event(self, event: Event) -> None: - """Handle events for minimal progress visualization.""" - print(f"\n\n[EVENT] {type(event).__name__}: {event.model_dump_json()[:200]}...") - - -api_key = os.getenv("LLM_API_KEY") -assert api_key is not None, "LLM_API_KEY environment variable is not set." -model = os.getenv("LLM_MODEL", "openhands/claude-sonnet-4-5-20250929") -base_url = os.getenv("LLM_BASE_URL") -llm = LLM( - model=model, - api_key=SecretStr(api_key), - base_url=base_url, - usage_id="agent", -) -agent = get_default_agent(llm=llm, cli_mode=True) - -# ============================================================================ -# Configure Visualization -# ============================================================================ -# Set logging level to reduce verbosity -logging.getLogger().setLevel(logging.WARNING) - -# Start a conversation with custom visualizer -cwd = os.getcwd() -conversation = Conversation( - agent=agent, - workspace=cwd, - visualizer=MinimalVisualizer(), -) + # Track state for minimal progress output + self._event_counter = ( + 0 # Sequential counter for all events (LLM calls and tools) + ) + self._seen_llm_response_ids: set[str] = set() + # Track which response IDs we've already displayed metrics for + self._displayed_metrics_for_response_ids: set[str] = set() + # Track which token usages we've already seen (by response_id) + self._seen_token_usage_response_ids: set[str] = set() + # Track which costs we've already seen (by index) + self._seen_cost_count = 0 + + def _get_metrics_for_response_id( + self, response_id: str + ) -> tuple[float, dict] | None: + """Extract cost and token usage for a specific response_id. + + Gets metrics from conversation_stats, tracking incrementally to find + new metrics. + + Returns: + Tuple of (cost, token_info_dict) or None if not found. + token_info_dict contains: prompt_tokens, completion_tokens, total_tokens + """ + # Get metrics from conversation stats (source of truth) + if not self.conversation_stats: + return None + + combined_metrics = self.conversation_stats.get_combined_metrics() + if not combined_metrics: + return None + + # Find token usage for this response_id that we haven't seen yet + token_usage = None + token_usage_index = None + for i, usage in enumerate(combined_metrics.token_usages): + if ( + usage.response_id == response_id + and usage.response_id not in self._seen_token_usage_response_ids + ): + token_usage = usage + token_usage_index = i + self._seen_token_usage_response_ids.add(usage.response_id) + break + + if not token_usage: + return None + + # Find the corresponding cost + # Costs and token_usages are added in the same order, but costs may be + # skipped if zero + cost = 0.0 + + # Look for new costs that we haven't seen yet + if ( + combined_metrics.costs + and len(combined_metrics.costs) > self._seen_cost_count + ): + # Get the cost at the same index as the token usage, or the most + # recent new cost + if token_usage_index is not None and token_usage_index < len( + combined_metrics.costs + ): + cost = combined_metrics.costs[token_usage_index].cost + self._seen_cost_count = max( + self._seen_cost_count, + token_usage_index + 1 if token_usage_index is not None else 0, + ) + else: + # Use the most recent cost if we have fewer costs than token usages + cost = combined_metrics.costs[-1].cost + self._seen_cost_count = len(combined_metrics.costs) + + return ( + cost, + { + "prompt_tokens": token_usage.prompt_tokens, + "completion_tokens": token_usage.completion_tokens, + "total_tokens": token_usage.prompt_tokens + + token_usage.completion_tokens, + }, + ) + + def _format_llm_call_line(self, response_id: str) -> str | None: + """Format LLM call line with cost and token information. + + Returns: + Formatted string or None if already displayed. + """ + if response_id in self._displayed_metrics_for_response_ids: + return None + + metrics_info = self._get_metrics_for_response_id(response_id) + if metrics_info: + cost, token_info = metrics_info + self._displayed_metrics_for_response_ids.add(response_id) + + # Format: "1. LLM call (tokens: 0000, cost $0.00)" + total_tokens = token_info["total_tokens"] + return f"LLM call (tokens: {total_tokens:04d}, cost ${cost:.2f})" + + # Fallback if metrics not available + self._displayed_metrics_for_response_ids.add(response_id) + return "LLM call (tokens: 0000, cost $0.00)" + + # Event handlers are now registered via decorators - no need for on_event override + + @handles(ActionEvent) + def _handle_action_event(self, event: ActionEvent) -> None: + """Handle ActionEvent - track LLM calls and show tool execution.""" + # Track LLM calls by monitoring new llm_response_id values + if ( + event.llm_response_id + and event.llm_response_id not in self._seen_llm_response_ids + ): + self._seen_llm_response_ids.add(event.llm_response_id) + # This is a new LLM call - show it + llm_line = self._format_llm_call_line(event.llm_response_id) + if llm_line: + self._event_counter += 1 + print(f"{self._event_counter}. {llm_line}", flush=True) + + # Show tool execution + self._event_counter += 1 + tool_name = event.tool_name if event.tool_name else "unknown" + + # Extract command/action details if available + command_str = "" + path_str = "" + if event.action: + action_dict = ( + event.action.model_dump() if hasattr(event.action, "model_dump") else {} + ) + if "command" in action_dict: + command_str = action_dict["command"] + if "path" in action_dict: + path_str = action_dict.get("path", "") + + # Format: "2. Tool: file_editor:view path" + if command_str and path_str: + tool_line = f"Tool: {tool_name}:{command_str} {path_str}" + elif command_str: + tool_line = f"Tool: {tool_name}:{command_str}" + else: + tool_line = f"Tool: {tool_name}" + + print(f"{self._event_counter}. {tool_line}", flush=True) + + @handles(ObservationEvent) + def _handle_observation_event(self, event: ObservationEvent) -> None: + """Handle ObservationEvent - no output needed.""" + _ = event # Event parameter required for handler signature + + @handles(AgentErrorEvent) + def _handle_error_event(self, event: AgentErrorEvent) -> None: + """Handle AgentErrorEvent - show errors.""" + self._event_counter += 1 + error_msg = event.error + # Truncate long error messages + error_preview = error_msg[:100] + "..." if len(error_msg) > 100 else error_msg + print(f"{self._event_counter}. Error: {error_preview}", flush=True) + + @handles(MessageEvent) + def _handle_message_event(self, event: MessageEvent) -> None: + """Handle MessageEvent - track LLM calls.""" + # Track LLM calls from MessageEvent (agent messages without tool calls) + if ( + event.source == "agent" + and event.llm_response_id + and event.llm_response_id not in self._seen_llm_response_ids + ): + self._seen_llm_response_ids.add(event.llm_response_id) + # This is a new LLM call - show it + llm_line = self._format_llm_call_line(event.llm_response_id) + if llm_line: + self._event_counter += 1 + print(f"{self._event_counter}. {llm_line}", flush=True) + + +def main(): + # ============================================================================ + # Configure LLM and Agent + # ============================================================================ + # You can get an API key from https://app.all-hands.dev/settings/api-keys + api_key = os.getenv("LLM_API_KEY") + assert api_key is not None, "LLM_API_KEY environment variable is not set." + model = os.getenv("LLM_MODEL", "openhands/claude-sonnet-4-5-20250929") + base_url = os.getenv("LLM_BASE_URL") + llm = LLM( + model=model, + api_key=SecretStr(api_key), + base_url=base_url, + usage_id="agent", + ) + agent = get_default_agent(llm=llm, cli_mode=True) + + # ============================================================================ + # Configure Visualization + # ============================================================================ + # Set logging level to reduce verbosity + logging.getLogger().setLevel(logging.WARNING) + + # Create custom visualizer instance + minimal_visualizer = MinimalProgressVisualizer() + + # Start a conversation with custom visualizer + cwd = os.getcwd() + conversation = Conversation( + agent=agent, + workspace=cwd, + visualizer=minimal_visualizer, + ) + + # Send a message and let the agent run + print("Sending task to agent...") + conversation.send_message("Write 3 facts about the current project into FACTS.txt.") + conversation.run() + print("Task completed!") + + # Report final accumulated cost and tokens + final_metrics = llm.metrics + print("\n=== Final Summary ===") + print(f"Total Cost: ${final_metrics.accumulated_cost:.2f}") + if final_metrics.accumulated_token_usage: + usage = final_metrics.accumulated_token_usage + total_tokens = usage.prompt_tokens + usage.completion_tokens + print( + f"Total Tokens: prompt={usage.prompt_tokens}, " + f"completion={usage.completion_tokens}, " + f"total={total_tokens}" + ) -# Send a message and let the agent run -print("Sending task to agent...") -conversation.send_message("Write 3 facts about the current project into FACTS.txt.") -conversation.run() -print("Task completed!") -# Report cost -cost = llm.metrics.accumulated_cost -print(f"EXAMPLE_COST: ${cost:.4f}") +if __name__ == "__main__": + main() From fea0e303b8334ffcc142bc74357fb08b94f21077 Mon Sep 17 00:00:00 2001 From: John-Mason Shackelford Date: Thu, 6 Nov 2025 20:49:59 -0500 Subject: [PATCH 2/2] docs(examples): split custom visualizer into simple and detailed examples Split the custom visualizer example into two files: - 26_custom_visualizer.py: Simple visualizer showing only latency - 27_detailed_llm_metrics.py: Advanced visualizer with latency, cost, and token tracking Changes: - Simplified 26 to only display latency (removed cost/token tracking) - Created 27 with MetricsCache class for efficient metrics lookup - Improved formatting with right-aligned step numbers and event type labels - Renamed class in 27 from SimpleVisualizer to TokenCountingVisualizer - Added documentation explaining SDK metrics structure and cost matching The MetricsCache class demonstrates how to match costs to response_ids using ResponseLatency index, since Cost objects don't have response_id fields. --- .../01_standalone_sdk/26_custom_visualizer.py | 306 ++++++-------- .../27_detailed_llm_metrics.py | 374 ++++++++++++++++++ 2 files changed, 489 insertions(+), 191 deletions(-) create mode 100644 examples/01_standalone_sdk/27_detailed_llm_metrics.py diff --git a/examples/01_standalone_sdk/26_custom_visualizer.py b/examples/01_standalone_sdk/26_custom_visualizer.py index c5cf7cac2c..f87efa807a 100644 --- a/examples/01_standalone_sdk/26_custom_visualizer.py +++ b/examples/01_standalone_sdk/26_custom_visualizer.py @@ -1,18 +1,17 @@ """Custom Visualizer Example This example demonstrates how to create and use a custom visualizer by subclassing -ConversationVisualizer. This approach provides: +ConversationVisualizerBase. This approach provides: - Clean, testable code with class-based state management - Direct configuration (just pass the visualizer instance to visualizer parameter) - Reusable visualizer that can be shared across conversations -The MinimalProgressVisualizer produces concise output showing: -- LLM call completions with cost and token information +The SimpleVisualizer produces concise output showing: +- LLM call completions with latency information - Tool execution steps with command/path details -- Agent thinking indicators - Error messages -This demonstrates how you can pass a ConversationVisualizer instance directly +This demonstrates how you can pass a ConversationVisualizerBase instance directly to the visualizer parameter for clean, reusable visualization logic. """ @@ -29,13 +28,14 @@ AgentErrorEvent, Event, MessageEvent, - ObservationEvent, ) +from openhands.sdk.llm.utils.metrics import Metrics +from openhands.sdk.tool import Action from openhands.tools.preset.default import get_default_agent def handles(event_type: type[Event]): - """Decorator to register a method as an event handler.""" + """Decorator to register a method as an event handler for a specific event type.""" def decorator(func): func._handles_event_type = event_type @@ -44,59 +44,23 @@ def decorator(func): return decorator -class EventHandlerMixin: - """Mixin that provides event handler registration via decorators.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._event_handlers: dict[type[Event], Callable[[Event], None]] = {} - self._register_handlers() - - def _register_handlers(self): - """Automatically discover and register event handlers.""" - for attr_name in dir(self): - attr = getattr(self, attr_name) - if hasattr(attr, "_handles_event_type"): - event_type = attr._handles_event_type - self._event_handlers[event_type] = attr - - def on_event(self, event: Event) -> None: - """Dispatch events to registered handlers.""" - event_type = type(event) - handler = self._event_handlers.get(event_type) - if handler: - handler(event) - # Optionally handle unknown events - subclasses can override this - else: - self._handle_unknown_event(event) - - def _handle_unknown_event(self, event: Event) -> None: - """Handle unknown event types. Override in subclasses if needed.""" - # Default: do nothing for unknown events - pass - - -class MinimalProgressVisualizer(EventHandlerMixin, ConversationVisualizerBase): - """A minimal progress visualizer that shows step counts and tool names. +class SimpleVisualizer(ConversationVisualizerBase): + """A simple visualizer that shows step counts and tool names. This visualizer produces concise output showing: - - LLM call completions with cost and token information + - LLM call completions with latency information - Tool execution steps with command/path details - - Agent thinking indicators - Error messages Example output: - 🤖 LLM call completed (cost: $0.001234, tokens: prompt=100, - completion=50, total=150) - Step 1: Executing str_replace_editor (view: .../FACTS.txt)... ✓ - 💭 Agent thinking... - 🤖 LLM call completed (cost: $0.002345, tokens: prompt=200, - completion=100, total=300) - Step 2: Executing str_replace_editor (str_replace: .../FACTS.txt)... ✓ + 1. LLM: 2.3s + 2. Tool: file_editor:view /path/to/file.txt + 3. LLM: 1.5s + 4. Tool: file_editor:str_replace /path/to/file.txt """ def __init__(self, name: str | None = None): - """Initialize the minimal progress visualizer. + """Initialize the simple visualizer. Args: name: Optional name to identify the agent/conversation. @@ -107,182 +71,142 @@ def __init__(self, name: str | None = None): super().__init__(name=name) # Track state for minimal progress output - self._event_counter = ( - 0 # Sequential counter for all events (LLM calls and tools) - ) - self._seen_llm_response_ids: set[str] = set() - # Track which response IDs we've already displayed metrics for - self._displayed_metrics_for_response_ids: set[str] = set() - # Track which token usages we've already seen (by response_id) - self._seen_token_usage_response_ids: set[str] = set() - # Track which costs we've already seen (by index) - self._seen_cost_count = 0 - - def _get_metrics_for_response_id( - self, response_id: str - ) -> tuple[float, dict] | None: - """Extract cost and token usage for a specific response_id. - - Gets metrics from conversation_stats, tracking incrementally to find - new metrics. + self._event_counter = 0 # Sequential counter for all events + self._displayed_response_ids: set[str] = set() # Track displayed LLM calls + + # Register event handlers via decorators + self._event_handlers: dict[type[Event], Callable[[Event], None]] = {} + for attr_name in dir(self): + attr = getattr(self, attr_name) + if hasattr(attr, "_handles_event_type"): + self._event_handlers[attr._handles_event_type] = attr + + def _get_latency_for_response_id(self, response_id: str) -> float | None: + """Get latency for a specific response_id. + + The SDK provides `response_latencies` as a list of ResponseLatency objects, + each with a `response_id` field. We can directly look up by response_id. Returns: - Tuple of (cost, token_info_dict) or None if not found. - token_info_dict contains: prompt_tokens, completion_tokens, total_tokens + Latency in seconds, or None if not found. """ - # Get metrics from conversation stats (source of truth) if not self.conversation_stats: return None - combined_metrics = self.conversation_stats.get_combined_metrics() - if not combined_metrics: - return None + combined_metrics: Metrics = self.conversation_stats.get_combined_metrics() - # Find token usage for this response_id that we haven't seen yet - token_usage = None - token_usage_index = None - for i, usage in enumerate(combined_metrics.token_usages): - if ( - usage.response_id == response_id - and usage.response_id not in self._seen_token_usage_response_ids - ): - token_usage = usage - token_usage_index = i - self._seen_token_usage_response_ids.add(usage.response_id) - break - - if not token_usage: - return None + # Find ResponseLatency by response_id + for response_latency in combined_metrics.response_latencies: + if response_latency.response_id == response_id: + return response_latency.latency - # Find the corresponding cost - # Costs and token_usages are added in the same order, but costs may be - # skipped if zero - cost = 0.0 - - # Look for new costs that we haven't seen yet - if ( - combined_metrics.costs - and len(combined_metrics.costs) > self._seen_cost_count - ): - # Get the cost at the same index as the token usage, or the most - # recent new cost - if token_usage_index is not None and token_usage_index < len( - combined_metrics.costs - ): - cost = combined_metrics.costs[token_usage_index].cost - self._seen_cost_count = max( - self._seen_cost_count, - token_usage_index + 1 if token_usage_index is not None else 0, - ) - else: - # Use the most recent cost if we have fewer costs than token usages - cost = combined_metrics.costs[-1].cost - self._seen_cost_count = len(combined_metrics.costs) - - return ( - cost, - { - "prompt_tokens": token_usage.prompt_tokens, - "completion_tokens": token_usage.completion_tokens, - "total_tokens": token_usage.prompt_tokens - + token_usage.completion_tokens, - }, - ) + return None def _format_llm_call_line(self, response_id: str) -> str | None: - """Format LLM call line with cost and token information. + """Format LLM call line with latency information. Returns: Formatted string or None if already displayed. """ - if response_id in self._displayed_metrics_for_response_ids: + if response_id in self._displayed_response_ids: return None - metrics_info = self._get_metrics_for_response_id(response_id) - if metrics_info: - cost, token_info = metrics_info - self._displayed_metrics_for_response_ids.add(response_id) + self._displayed_response_ids.add(response_id) - # Format: "1. LLM call (tokens: 0000, cost $0.00)" - total_tokens = token_info["total_tokens"] - return f"LLM call (tokens: {total_tokens:04d}, cost ${cost:.2f})" + latency = self._get_latency_for_response_id(response_id) + if latency is not None: + return f"{'LLM:':>5} {latency:.1f}s" # Fallback if metrics not available - self._displayed_metrics_for_response_ids.add(response_id) - return "LLM call (tokens: 0000, cost $0.00)" + return f"{'LLM:':>5} 0.0s" - # Event handlers are now registered via decorators - no need for on_event override + def _format_tool_line(self, tool_name: str, action: Action) -> str: + """Format a tool execution line with command and path details. + + Args: + tool_name: Name of the tool being executed + action: The Action object from the SDK + (may have 'command' and/or 'path' attributes) + + Returns: + Formatted tool line string + """ + # Extract command/action details from the action object + command_str = getattr(action, "command", "") + path_str = getattr(action, "path", "") + + if command_str and path_str: + return f"{'Tool:':>5} {tool_name}:{command_str} {path_str}" + elif command_str: + return f"{'Tool:':>5} {tool_name}:{command_str}" + else: + return f"{'Tool:':>5} {tool_name}" + + def on_event(self, event: Event) -> None: + """Dispatch events to registered handlers.""" + handler = self._event_handlers.get(type(event)) + if handler: + handler(event) @handles(ActionEvent) def _handle_action_event(self, event: ActionEvent) -> None: """Handle ActionEvent - track LLM calls and show tool execution.""" - # Track LLM calls by monitoring new llm_response_id values - if ( - event.llm_response_id - and event.llm_response_id not in self._seen_llm_response_ids - ): - self._seen_llm_response_ids.add(event.llm_response_id) - # This is a new LLM call - show it + # Show LLM call that generated this action event + # In the SDK, a single LLM response can generate multiple ActionEvents + # (parallel function calling). All ActionEvents from the same LLM response + # share the same llm_response_id. We show the LLM call once per response_id + # (deduplication handled by _format_llm_call_line), even if action is None + # (non-executable tool calls still have an associated LLM call). + if event.llm_response_id: llm_line = self._format_llm_call_line(event.llm_response_id) if llm_line: self._event_counter += 1 - print(f"{self._event_counter}. {llm_line}", flush=True) + print(f"{self._event_counter:>4}. {llm_line}", flush=True) + + # Skip tool execution if action is None (non-executable tool calls) + # Example: Agent tries to call a tool that doesn't exist (e.g., "missing_tool") + # The SDK creates an ActionEvent with action=None and then emits an + # AgentErrorEvent + if not event.action: + return # Show tool execution self._event_counter += 1 - tool_name = event.tool_name if event.tool_name else "unknown" - - # Extract command/action details if available - command_str = "" - path_str = "" - if event.action: - action_dict = ( - event.action.model_dump() if hasattr(event.action, "model_dump") else {} - ) - if "command" in action_dict: - command_str = action_dict["command"] - if "path" in action_dict: - path_str = action_dict.get("path", "") - - # Format: "2. Tool: file_editor:view path" - if command_str and path_str: - tool_line = f"Tool: {tool_name}:{command_str} {path_str}" - elif command_str: - tool_line = f"Tool: {tool_name}:{command_str}" - else: - tool_line = f"Tool: {tool_name}" - - print(f"{self._event_counter}. {tool_line}", flush=True) - - @handles(ObservationEvent) - def _handle_observation_event(self, event: ObservationEvent) -> None: - """Handle ObservationEvent - no output needed.""" - _ = event # Event parameter required for handler signature + tool_name = event.tool_name or "unknown" - @handles(AgentErrorEvent) - def _handle_error_event(self, event: AgentErrorEvent) -> None: - """Handle AgentErrorEvent - show errors.""" - self._event_counter += 1 - error_msg = event.error - # Truncate long error messages - error_preview = error_msg[:100] + "..." if len(error_msg) > 100 else error_msg - print(f"{self._event_counter}. Error: {error_preview}", flush=True) + tool_line = self._format_tool_line(tool_name, event.action) + print(f"{self._event_counter:>4}. {tool_line}", flush=True) @handles(MessageEvent) def _handle_message_event(self, event: MessageEvent) -> None: """Handle MessageEvent - track LLM calls.""" - # Track LLM calls from MessageEvent (agent messages without tool calls) - if ( - event.source == "agent" - and event.llm_response_id - and event.llm_response_id not in self._seen_llm_response_ids - ): - self._seen_llm_response_ids.add(event.llm_response_id) - # This is a new LLM call - show it + # Show LLM call for agent messages without tool calls + if event.source == "agent" and event.llm_response_id: llm_line = self._format_llm_call_line(event.llm_response_id) if llm_line: self._event_counter += 1 - print(f"{self._event_counter}. {llm_line}", flush=True) + print(f"{self._event_counter:>4}. {llm_line}", flush=True) + + def _truncate_error(self, error_msg: str, max_length: int = 100) -> str: + """Truncate error message if it exceeds max_length. + + Args: + error_msg: The error message to truncate + max_length: Maximum length before truncation + + Returns: + Truncated error message with "..." suffix if needed + """ + if len(error_msg) > max_length: + return error_msg[:max_length] + "..." + return error_msg + + @handles(AgentErrorEvent) + def _handle_error_event(self, event: AgentErrorEvent) -> None: + """Handle AgentErrorEvent - show errors.""" + self._event_counter += 1 + error_preview = self._truncate_error(event.error) + print(f"{self._event_counter:>4}. {'Error:':>5} {error_preview}", flush=True) def main(): @@ -309,14 +233,14 @@ def main(): logging.getLogger().setLevel(logging.WARNING) # Create custom visualizer instance - minimal_visualizer = MinimalProgressVisualizer() + simple_visualizer = SimpleVisualizer() # Start a conversation with custom visualizer cwd = os.getcwd() conversation = Conversation( agent=agent, workspace=cwd, - visualizer=minimal_visualizer, + visualizer=simple_visualizer, ) # Send a message and let the agent run diff --git a/examples/01_standalone_sdk/27_detailed_llm_metrics.py b/examples/01_standalone_sdk/27_detailed_llm_metrics.py new file mode 100644 index 0000000000..8518d2da8f --- /dev/null +++ b/examples/01_standalone_sdk/27_detailed_llm_metrics.py @@ -0,0 +1,374 @@ +"""Custom Visualizer Example - Detailed LLM Metrics + +This example builds on the custom visualizer example (26_custom_visualizer.py) by +showing how to compute and add detailed LLM metrics to the visualizer. +The TokenCountingVisualizer produces concise output showing: +- LLM call completions with latency, cost and token information +- Tool execution steps with command/path details +- Error messages + +This demonstrates how you can pass a ConversationVisualizer instance directly +to the visualizer parameter for clean, reusable visualization logic. +""" + +import logging +import os +from collections.abc import Callable + +from pydantic import SecretStr + +from openhands.sdk import LLM, Conversation +from openhands.sdk.conversation.visualizer import ConversationVisualizerBase +from openhands.sdk.event import ( + ActionEvent, + AgentErrorEvent, + Event, + MessageEvent, +) +from openhands.sdk.llm.utils.metrics import Metrics, TokenUsage +from openhands.sdk.tool import Action +from openhands.tools.preset.default import get_default_agent + + +def handles(event_type: type[Event]): + """Decorator to register a method as an event handler for a specific event type.""" + + def decorator(func): + func._handles_event_type = event_type + return func + + return decorator + + +class MetricsCache: + """Handles caching and lookup of metrics (latency, cost, tokens) by response_id. + + **What the SDK provides:** + The SDK provides metrics in separate lists via `Metrics`: + - `response_latencies`: List of ResponseLatency objects + (always added, has response_id) + - `token_usages`: List of TokenUsage objects (has response_id) + - `costs`: List of Cost objects (only added if cost > 0, no response_id field) + + These lists are added in the same order, but costs may be skipped if zero. + + **What we need to do ourselves:** + The SDK doesn't provide a direct way to get cost for a specific response_id because: + 1. Cost objects don't have a response_id field + 2. Costs may be skipped if zero, so indices don't align perfectly + + To match costs to response_ids, we use the ResponseLatency index since: + 1. ResponseLatency is always added (unlike costs which skip zeros) + 2. ResponseLatency has a response_id field for direct matching + 3. Costs and response_latencies are added in the same order + + This class builds a cache mapping response_id -> (latency, latency_index, + token_usage) and incrementally updates it as new metrics are added, + avoiding full rebuilds. + """ + + def __init__(self) -> None: + """Initialize the metrics cache.""" + # Cache: response_id -> (latency, latency_index, token_usage) + self._cache: dict[str, tuple[float, int, TokenUsage]] = {} + self._last_processed_count: int = 0 + + def get_metrics( + self, response_id: str, combined_metrics: Metrics + ) -> tuple[float, float, dict] | None: + """Get latency, cost, and token usage for a specific response_id. + + Args: + response_id: The response ID to look up + combined_metrics: The metrics object containing all metrics + + Returns: + Tuple of (latency, cost, token_info_dict) or None if not found. + token_info_dict contains: prompt_tokens, completion_tokens, total_tokens + """ + # Update cache if new entries have been added + self._update_cache(combined_metrics) + + # Lookup from cache + cached = self._cache.get(response_id) + if not cached: + return None + + latency, latency_index, token_usage = cached + + # Match cost using latency_index + # Since response_latencies and costs are added in the same order + # (with costs skipping zeros), we can use the latency_index to get the + # corresponding cost. + cost = 0.0 + if latency_index >= 0 and combined_metrics.costs: + if latency_index < len(combined_metrics.costs): + cost = combined_metrics.costs[latency_index].cost + # If latency_index is beyond costs list, this response_id had zero + # cost (not recorded) + + return ( + latency, + cost, + { + "prompt_tokens": token_usage.prompt_tokens, + "completion_tokens": token_usage.completion_tokens, + "total_tokens": token_usage.prompt_tokens + + token_usage.completion_tokens, + }, + ) + + def _update_cache(self, combined_metrics: Metrics) -> None: + """Incrementally update cache by adding only new entries. + + Instead of rebuilding the entire cache each time, we only process new entries + that have been added since the last update. This is more efficient for + real-world execution where new LLM calls happen incrementally. + + Checks if new entries have been added and only updates if needed. + """ + current_count = len(combined_metrics.response_latencies) + len( + combined_metrics.token_usages + ) + if current_count <= self._last_processed_count: + return # No new entries, skip update + + # Build latency lookup for new entries + latency_map: dict[str, tuple[float, int]] = {} + for i, response_latency in enumerate(combined_metrics.response_latencies): + latency_map[response_latency.response_id] = (response_latency.latency, i) + + # Add new token_usages to cache + for token_usage in combined_metrics.token_usages: + response_id = token_usage.response_id + if response_id not in self._cache: + latency, latency_index = latency_map.get(response_id, (0.0, -1)) + self._cache[response_id] = (latency, latency_index, token_usage) + + self._last_processed_count = current_count + + +# ============================================================================ +# Custom Visualizer +# ============================================================================ +class TokenCountingVisualizer(ConversationVisualizerBase): + """A visualizer that shows step counts, tool names, and detailed LLM metrics. + + This visualizer produces concise output showing: + - LLM call completions with latency, cost and token information + - Tool execution steps with command/path details + - Error messages + + Example output: + 1. LLM: 2.3s, tokens: 0150, cost $0.00 + 2. Tool: file_editor:view /path/to/file.txt + 3. LLM: 1.5s, tokens: 0300, cost $0.01 + 4. Tool: file_editor:str_replace /path/to/file.txt + """ + + def __init__(self, name: str | None = None): + """Initialize the token counting visualizer. + + Args: + name: Optional name to identify the agent/conversation. + Note: This visualizer doesn't use it in output, + but accepts it for compatibility with the base class. + """ + # Initialize parent - state will be set later via initialize() + super().__init__(name=name) + + # Track state for minimal progress output + self._event_counter = 0 # Sequential counter for all events + self._displayed_response_ids: set[str] = set() # Track displayed LLM calls + self._metrics_cache = MetricsCache() # Handles metrics caching and lookups + + # Register event handlers via decorators + self._event_handlers: dict[type[Event], Callable[[Event], None]] = {} + for attr_name in dir(self): + attr = getattr(self, attr_name) + if hasattr(attr, "_handles_event_type"): + self._event_handlers[attr._handles_event_type] = attr + + def _get_metrics_for_response_id( + self, response_id: str + ) -> tuple[float, float, dict] | None: + """Extract latency, cost, and token usage for a specific response_id. + + Returns: + Tuple of (latency, cost, token_info_dict) or None if not found. + token_info_dict contains: prompt_tokens, completion_tokens, total_tokens + """ + if not self.conversation_stats: + return None + + combined_metrics: Metrics = self.conversation_stats.get_combined_metrics() + return self._metrics_cache.get_metrics(response_id, combined_metrics) + + def _format_llm_call_line(self, response_id: str) -> str | None: + """Format LLM call line with cost and token information. + + Returns: + Formatted string or None if already displayed. + """ + if response_id in self._displayed_response_ids: + return None + + self._displayed_response_ids.add(response_id) + + metrics_info = self._get_metrics_for_response_id(response_id) + if metrics_info: + latency, cost, token_info = metrics_info + total_tokens = token_info["total_tokens"] + return ( + f"{'LLM:':>5} {latency:.1f}s, tokens: {total_tokens:04d}, " + f"cost ${cost:.2f}" + ) + + # Fallback if metrics not available + return f"{'LLM:':>5} 0.0s, tokens: 0000, cost $0.00" + + def _format_tool_line(self, tool_name: str, action: Action) -> str: + """Format a tool execution line with command and path details. + + Args: + tool_name: Name of the tool being executed + action: The Action object from the SDK + (may have 'command' and/or 'path' attributes) + + Returns: + Formatted tool line string + """ + # Extract command/action details from the action object + command_str = getattr(action, "command", "") + path_str = getattr(action, "path", "") + + if command_str and path_str: + return f"{'Tool:':>5} {tool_name}:{command_str} {path_str}" + elif command_str: + return f"{'Tool:':>5} {tool_name}:{command_str}" + else: + return f"{'Tool:':>5} {tool_name}" + + def on_event(self, event: Event) -> None: + """Dispatch events to registered handlers.""" + handler = self._event_handlers.get(type(event)) + if handler: + handler(event) + + @handles(ActionEvent) + def _handle_action_event(self, event: ActionEvent) -> None: + """Handle ActionEvent - track LLM calls and show tool execution.""" + # Show LLM call that generated this action event + # In the SDK, a single LLM response can generate multiple ActionEvents + # (parallel function calling). All ActionEvents from the same LLM response + # share the same llm_response_id. We show the LLM call once per response_id + # (deduplication handled by _format_llm_call_line), even if action is None + # (non-executable tool calls still have an associated LLM call). + if event.llm_response_id: + llm_line = self._format_llm_call_line(event.llm_response_id) + if llm_line: + self._event_counter += 1 + print(f"{self._event_counter:>4}. {llm_line}", flush=True) + + # Skip tool execution if action is None (non-executable tool calls) + # Example: Agent tries to call a tool that doesn't exist (e.g., "missing_tool") + # The SDK creates an ActionEvent with action=None and then emits an + # AgentErrorEvent + if not event.action: + return + + # Show tool execution + self._event_counter += 1 + tool_name = event.tool_name or "unknown" + + tool_line = self._format_tool_line(tool_name, event.action) + print(f"{self._event_counter:>4}. {tool_line}", flush=True) + + @handles(MessageEvent) + def _handle_message_event(self, event: MessageEvent) -> None: + """Handle MessageEvent - track LLM calls.""" + # Show LLM call for agent messages without tool calls + if event.source == "agent" and event.llm_response_id: + llm_line = self._format_llm_call_line(event.llm_response_id) + if llm_line: + self._event_counter += 1 + print(f"{self._event_counter:>4}. {llm_line}", flush=True) + + def _truncate_error(self, error_msg: str, max_length: int = 100) -> str: + """Truncate error message if it exceeds max_length. + + Args: + error_msg: The error message to truncate + max_length: Maximum length before truncation + + Returns: + Truncated error message with "..." suffix if needed + """ + if len(error_msg) > max_length: + return error_msg[:max_length] + "..." + return error_msg + + @handles(AgentErrorEvent) + def _handle_error_event(self, event: AgentErrorEvent) -> None: + """Handle AgentErrorEvent - show errors.""" + self._event_counter += 1 + error_preview = self._truncate_error(event.error) + print(f"{self._event_counter:>4}. {'Error:':>5} {error_preview}", flush=True) + + +def main(): + # ============================================================================ + # Configure LLM and Agent + # ============================================================================ + # You can get an API key from https://app.all-hands.dev/settings/api-keys + api_key = os.getenv("LLM_API_KEY") + assert api_key is not None, "LLM_API_KEY environment variable is not set." + model = os.getenv("LLM_MODEL", "openhands/claude-sonnet-4-5-20250929") + base_url = os.getenv("LLM_BASE_URL") + llm = LLM( + model=model, + api_key=SecretStr(api_key), + base_url=base_url, + usage_id="agent", + ) + agent = get_default_agent(llm=llm, cli_mode=True) + + # ============================================================================ + # Configure Visualization + # ============================================================================ + # Set logging level to reduce verbosity + logging.getLogger().setLevel(logging.WARNING) + + # Create custom visualizer instance + token_counting_visualizer = TokenCountingVisualizer() + + # Start a conversation with custom visualizer + cwd = os.getcwd() + conversation = Conversation( + agent=agent, + workspace=cwd, + visualizer=token_counting_visualizer, + ) + + # Send a message and let the agent run + print("Sending task to agent...") + conversation.send_message("Write 3 facts about the current project into FACTS.txt.") + conversation.run() + print("Task completed!") + + # Report final accumulated cost and tokens + final_metrics = llm.metrics + print("\n=== Final Summary ===") + print(f"Total Cost: ${final_metrics.accumulated_cost:.2f}") + if final_metrics.accumulated_token_usage: + usage = final_metrics.accumulated_token_usage + total_tokens = usage.prompt_tokens + usage.completion_tokens + print( + f"Total Tokens: prompt={usage.prompt_tokens}, " + f"completion={usage.completion_tokens}, " + f"total={total_tokens}" + ) + + +if __name__ == "__main__": + main()