From e0da6842d5fd20e38f4c8b72b6f4b6f39710f073 Mon Sep 17 00:00:00 2001 From: matdev83 <211248003+matdev83@users.noreply.github.com> Date: Fri, 7 Nov 2025 15:11:00 +0100 Subject: [PATCH] Refactor hybrid connector into modular package --- src/connectors/hybrid.py | 2166 ----------------- src/connectors/hybrid/__init__.py | 12 + src/connectors/hybrid/connector.py | 494 ++++ src/connectors/hybrid/constants.py | 4 + src/connectors/hybrid/logging_utils.py | 20 + src/connectors/hybrid/message_augmentation.py | 105 + src/connectors/hybrid/model_spec.py | 139 ++ src/connectors/hybrid/parameters.py | 150 ++ src/connectors/hybrid/phases.py | 522 ++++ src/connectors/hybrid/reasoning_markup.py | 106 + src/connectors/hybrid/request_preparation.py | 79 + src/connectors/hybrid/response_filtering.py | 387 +++ src/connectors/hybrid/types.py | 25 + src/connectors/hybrid/utils.py | 66 + 14 files changed, 2109 insertions(+), 2166 deletions(-) delete mode 100644 src/connectors/hybrid.py create mode 100644 src/connectors/hybrid/__init__.py create mode 100644 src/connectors/hybrid/connector.py create mode 100644 src/connectors/hybrid/constants.py create mode 100644 src/connectors/hybrid/logging_utils.py create mode 100644 src/connectors/hybrid/message_augmentation.py create mode 100644 src/connectors/hybrid/model_spec.py create mode 100644 src/connectors/hybrid/parameters.py create mode 100644 src/connectors/hybrid/phases.py create mode 100644 src/connectors/hybrid/reasoning_markup.py create mode 100644 src/connectors/hybrid/request_preparation.py create mode 100644 src/connectors/hybrid/response_filtering.py create mode 100644 src/connectors/hybrid/types.py create mode 100644 src/connectors/hybrid/utils.py diff --git a/src/connectors/hybrid.py b/src/connectors/hybrid.py deleted file mode 100644 index e1b0f9fe..00000000 --- a/src/connectors/hybrid.py +++ /dev/null @@ -1,2166 +0,0 @@ -""" -Hybrid backend connector - orchestrates two-phase LLM interactions. - -This connector implements a hybrid approach where: -1. A reasoning model generates chain-of-thought reasoning -2. The reasoning is captured and injected into the execution model's context -3. The execution model generates the final response with enhanced context -""" - -from __future__ import annotations - -import asyncio -import contextlib -import json -import logging -import random -import re -import time -import uuid -from copy import deepcopy -from dataclasses import asdict, dataclass, is_dataclass -from typing import TYPE_CHECKING, Any, cast - -import httpx - -from src.connectors.base import LLMBackend -from src.connectors.utils.model_capabilities import ( - get_execution_params, - get_reasoning_params, - get_reasoning_tags, - supports_system_messages, -) -from src.connectors.utils.reasoning_stream_processor import ( - ReasoningStreamProcessor, -) -from src.core.app.constants.logging_constants import TRACE_LEVEL -from src.core.common.exceptions import ( - AuthenticationError, - BackendError, - ConfigurationError, - ServiceResolutionError, -) -from src.core.config.app_config import AppConfig -from src.core.domain.responses import ResponseEnvelope, StreamingResponseEnvelope -from src.core.interfaces.configuration_interface import IAppIdentityConfig -from src.core.interfaces.model_bases import DomainModel, InternalDTO -from src.core.interfaces.response_processor_interface import ProcessedResponse -from src.core.services.backend_registry import backend_registry - -if TYPE_CHECKING: - from src.core.domain.chat import CanonicalChatRequest - from src.core.services.backend_registry import BackendRegistry - from src.core.services.translation_service import TranslationService - -logger = logging.getLogger(__name__) - -# Timeout constants -REASONING_PHASE_TIMEOUT = 60.0 # seconds -EXECUTION_PHASE_TIMEOUT = 120.0 # seconds - - -@dataclass -class ReasoningPhaseResult: - """Container for reasoning phase outcome.""" - - text: str - complete: bool - tool_calls: list[dict[str, Any]] - raw_chunks: list[ProcessedResponse] - media_type: str | None - headers: dict[str, str] | None - - def has_tool_calls(self) -> bool: - """Check whether reasoning produced any tool calls.""" - - return bool(self.tool_calls) - - -class HybridConnector(LLMBackend): - """LLMBackend implementation for hybrid two-phase reasoning approach.""" - - backend_type: str = "hybrid" - _LEADING_REASONING_TAG = re.compile( - r"^\s*<\s*(?:think|thinking|reason|reasoning)\b[^>]*>\s*", re.IGNORECASE - ) - _TRAILING_REASONING_TAG = re.compile( - r"\s*<\s*/\s*(?:think|thinking|reason|reasoning)\b[^>]*>\s*$", - re.IGNORECASE, - ) - - def __init__( - self, - client: httpx.AsyncClient, - config: AppConfig, - translation_service: TranslationService, - backend_registry: BackendRegistry | None = None, - ) -> None: - """Initialize the hybrid connector. - - Args: - client: HTTP client for API calls - config: Application configuration - translation_service: Service for translating between formats - backend_registry: Registry to resolve backend connectors - """ - super().__init__(config=config) - self.client = client - self.config = config - self.translation_service = translation_service - self._backend_registry = backend_registry - - async def initialize(self, **kwargs: Any) -> None: - """Initialize the hybrid backend. - - Note: - Reasoning and execution models are specified per-request in the model field, - not during initialization. - - Args: - **kwargs: Additional configuration (unused for hybrid backend) - - Raises: - ConfigurationError: If hybrid backend is disabled in configuration - """ - # Import backend_registry if not provided in constructor - if self._backend_registry is None: - from src.core.services.backend_registry import backend_registry - - self._backend_registry = backend_registry - - # Check if hybrid backend is disabled - if ( - hasattr(self.config, "backends") - and hasattr(self.config.backends, "disable_hybrid_backend") - and self.config.backends.disable_hybrid_backend - ): - logger.warning("Hybrid backend is disabled in configuration") - - logger.info("Hybrid backend initialized successfully") - - def _parse_hybrid_model_spec( - self, model_spec: str - ) -> tuple[str, str, dict[str, Any], str, str, dict[str, Any]]: - """Parse hybrid model specification with optional URI parameters. - - Args: - model_spec: Format "hybrid:[reasoning-backend:reasoning-model?params,execution-backend:execution-model?params]" - Example: "hybrid:[minimax:MiniMax-M2?temperature=0.8,qwen-oauth:qwen3-coder-plus?temperature=0.3]" - - Returns: - Tuple of (reasoning_backend, reasoning_model, reasoning_params, - execution_backend, execution_model, execution_params) - - Raises: - ValueError: If format is invalid or incomplete with descriptive messages and examples - """ - from src.core.domain.model_utils import parse_model_with_params - - # Remove "hybrid:" prefix if present - if model_spec.startswith("hybrid:"): - model_spec = model_spec[7:] - - # Check for brackets - if not model_spec.startswith("[") or not model_spec.endswith("]"): - raise ValueError( - "Invalid hybrid model format. Expected: hybrid:[reasoning-backend:reasoning-model,execution-backend:execution-model]. " - "Example: hybrid:[minimax:MiniMax-M2,qwen-oauth:qwen3-coder-plus]" - ) - - # Remove brackets - model_spec = model_spec[1:-1] - - # Split by comma - need to be careful with commas in query strings - # Strategy: Split by comma, but track if we're inside a query string - # A comma inside a query string (after ?) should not split the models - # We need to find the comma that separates the two model specs - parts = [] - current_part: list[str] = [] - - i = 0 - while i < len(model_spec): - char = model_spec[i] - - # Check if this is a comma that separates models - # It should be a comma that's not part of a query string - if char == ",": - # Look back to see if we're in a query string - # A comma is a separator if there's no '?' before it in the current part - # or if there's a complete backend:model before it - current_str = "".join(current_part) - - # Check if we have a complete model spec (backend:model with optional ?params) - # by checking if there's a colon before any question mark - has_colon = ":" in current_str - current_str.find("?") - - if has_colon: - # This looks like a complete model spec, so this comma is a separator - parts.append(current_str) - current_part = [] - i += 1 - continue - - current_part.append(char) - i += 1 - - # Add the last part - if current_part: - parts.append("".join(current_part)) - - if len(parts) != 2: - raise ValueError( - f"Invalid hybrid model format. Expected exactly 2 models separated by comma, got {len(parts)}. " - "Expected: hybrid:[reasoning-backend:reasoning-model,execution-backend:execution-model]. " - "Example: hybrid:[minimax:MiniMax-M2,qwen-oauth:qwen3-coder-plus]" - ) - - reasoning_spec = parts[0].strip() - execution_spec = parts[1].strip() - - # Parse reasoning model spec with URI parameters - try: - reasoning_backend, reasoning_model, reasoning_params = ( - parse_model_with_params(reasoning_spec) - ) - except Exception as e: - # Log warning about parsing failure but provide helpful error message - logger.warning( - f"Failed to parse reasoning model specification '{reasoning_spec}': {e}. " - f"Attempting to continue with fallback parsing." - ) - raise ValueError( - f"Invalid reasoning model specification: '{reasoning_spec}'. " - f"Error: {e}. " - "Expected format: backend:model or backend:model?params. " - "Example: minimax:MiniMax-M2?temperature=0.8" - ) from e - - reasoning_backend = reasoning_backend.strip() - reasoning_model = reasoning_model.strip() - - if not reasoning_backend or not reasoning_model: - raise ValueError( - f"Incomplete reasoning model specification: '{reasoning_spec}'. " - "Both backend and model must be non-empty. " - "Example: minimax:MiniMax-M2" - ) - - # Parse execution model spec with URI parameters - try: - execution_backend, execution_model, execution_params = ( - parse_model_with_params(execution_spec) - ) - except Exception as e: - # Log warning about parsing failure but provide helpful error message - logger.warning( - f"Failed to parse execution model specification '{execution_spec}': {e}. " - f"Attempting to continue with fallback parsing." - ) - raise ValueError( - f"Invalid execution model specification: '{execution_spec}'. " - f"Error: {e}. " - "Expected format: backend:model or backend:model?params. " - "Example: qwen-oauth:qwen3-coder-plus?temperature=0.3" - ) from e - - execution_backend = execution_backend.strip() - execution_model = execution_model.strip() - - if not execution_backend or not execution_model: - raise ValueError( - f"Incomplete execution model specification: '{execution_spec}'. " - "Both backend and model must be non-empty. " - "Example: qwen-oauth:qwen3-coder-plus" - ) - - logger.debug( - f"Parsed hybrid model spec: reasoning={reasoning_backend}:{reasoning_model} (params={reasoning_params}), " - f"execution={execution_backend}:{execution_model} (params={execution_params})" - ) - - return ( - reasoning_backend, - reasoning_model, - reasoning_params, - execution_backend, - execution_model, - execution_params, - ) - - def _apply_reasoning_params( - self, - request_data: DomainModel | InternalDTO | dict[str, Any], - backend_or_params: str | dict[str, Any], - enable_reasoning: bool | None = None, - ) -> DomainModel | InternalDTO | dict[str, Any]: - """Apply backend-specific reasoning parameters to the request. - - This method supports both legacy usage where a backend name is provided - alongside the ``enable_reasoning`` flag, as well as direct parameter - dictionaries (used in certain integration tests). - """ - - if isinstance(backend_or_params, str): - if enable_reasoning is None: - raise TypeError( - "enable_reasoning flag is required when backend name is provided" - ) - params = ( - get_reasoning_params(backend_or_params) - if enable_reasoning - else get_execution_params(backend_or_params) - ) - elif isinstance(backend_or_params, dict): - params = backend_or_params - else: - raise TypeError( - "backend_or_params must be a backend string or parameter dictionary" - ) - - return self._apply_parameter_overrides(request_data, params) - - def _apply_parameter_overrides( - self, - request_data: DomainModel | InternalDTO | dict[str, Any], - params: dict[str, Any], - ) -> DomainModel | InternalDTO | dict[str, Any]: - """Apply a parameter dictionary to the request data.""" - - # If no parameters to override, return original - if not params: - return request_data - - # Log the overrides - for key, value in params.items(): - logger.debug(f"Applying override {key}={value} to request") - - # Handle Pydantic models (includes CanonicalChatRequest) - if isinstance(request_data, DomainModel): - # Ensure extra_body is a mutable dict - current_extra_body = getattr(request_data, "extra_body", None) - new_extra_body = dict(current_extra_body) if current_extra_body else {} - - # Apply overrides - new_extra_body.update(params) - - # Strip hybrid routing hints that would confuse downstream connectors - for drop_key in ("backend_type", "model"): - new_extra_body.pop(drop_key, None) - - return request_data.model_copy( - update={ - "extra_body": new_extra_body if new_extra_body else None, - **params, - } - ) - - # Handle dicts - elif isinstance(request_data, dict): - request_copy = dict(request_data) - # Ensure extra_body exists and is a mutable dict - current_extra_body = request_copy.get("extra_body") - new_extra_body = ( - dict(current_extra_body) if isinstance(current_extra_body, dict) else {} - ) - - # Apply overrides - new_extra_body.update(params) - for drop_key in ("backend_type", "model"): - new_extra_body.pop(drop_key, None) - request_copy["extra_body"] = new_extra_body if new_extra_body else None - - # Expose overrides at the top level for compatibility - request_copy.update(params) - - return request_copy - - # Handle dataclasses - elif is_dataclass(request_data) and not isinstance(request_data, type): - request_dict = asdict(request_data) - # Ensure extra_body exists and is a mutable dict - current_extra_body = request_dict.get("extra_body") - new_extra_body = ( - dict(current_extra_body) if isinstance(current_extra_body, dict) else {} - ) - - # Apply overrides - new_extra_body.update(params) - for drop_key in ("backend_type", "model"): - new_extra_body.pop(drop_key, None) - request_dict["extra_body"] = new_extra_body if new_extra_body else None - - # Merge overrides into the dataclass representation - request_dict.update(params) - - # Return as dict since we can't easily reconstruct the dataclass - return request_dict - # Fallback: return original if type is not supported - logger.warning( - f"Unsupported request_data type in _apply_reasoning_params: {type(request_data).__name__}" - ) - return request_data - - def _resolve_backend_identity( - self, - backend: str, - request_identity: IAppIdentityConfig | None, - backend_config: Any = None, - ) -> IAppIdentityConfig | None: - """Resolve identity configuration for backend calls. - - Preference order: - 1. Backend-specific identity provided via backend_config or AppConfig.backends - 2. Identity attached to the current request - 3. Global application identity - """ - - if backend_config is not None and getattr(backend_config, "identity", None): - return cast(IAppIdentityConfig, backend_config.identity) - - backend_identity = None - if hasattr(self.config, "backends"): - with contextlib.suppress(AttributeError): - backend_settings = getattr(self.config.backends, backend) - backend_identity = getattr(backend_settings, "identity", None) - if backend_identity is not None: - return cast(IAppIdentityConfig, backend_identity) - - if request_identity is not None: - return request_identity - - return getattr(self.config, "identity", None) - - def get_reasoning_params(self, backend: str = "openai") -> dict[str, Any]: - """Expose reasoning parameter presets for tests and diagnostics.""" - - return get_reasoning_params(backend) - - def get_execution_params(self, backend: str = "openai") -> dict[str, Any]: - """Expose execution parameter presets for tests and diagnostics.""" - - return get_execution_params(backend) - - def _supports_system_messages(self, backend: str) -> bool: - """Check if backend supports system messages. - - Args: - backend: Backend name - - Returns: - True if backend supports system role messages - """ - return supports_system_messages(backend) - - @staticmethod - def _assemble_reasoning_markup( - opening_tag: str, closing_tag: str, body: str - ) -> str: - """Rebuild reasoning text with canonical tags.""" - - if not body: - return f"{opening_tag}{closing_tag}" - - if "\n" in body or body.startswith("<"): - return f"{opening_tag}\n{body}\n{closing_tag}" - - return f"{opening_tag}{body}{closing_tag}" - - def _normalize_reasoning_markup( - self, reasoning_output: str, opening_tag: str, closing_tag: str - ) -> str: - """Normalize reasoning markup to use canonical tags and ensure closure.""" - - truncated = self._truncate_after_reasoning_close(reasoning_output) - stripped = truncated.strip() - if not stripped: - return stripped - - leading_match = self._LEADING_REASONING_TAG.match(stripped) - body_start = leading_match.end() if leading_match else 0 - body_section = stripped[body_start:] - - trailing_match = self._TRAILING_REASONING_TAG.search(body_section) - if trailing_match: - body_end = trailing_match.start() - body_section = body_section[:body_end] - - body = body_section.strip() - return self._assemble_reasoning_markup(opening_tag, closing_tag, body) - - def _apply_reasoning_tag_wrapping( - self, reasoning_output: str, opening_tag: str, closing_tag: str - ) -> str: - """Wrap or normalize reasoning output using backend-specific tags.""" - - return self._normalize_reasoning_markup( - reasoning_output, opening_tag, closing_tag - ) - - @staticmethod - def _extract_reasoning_inner_text(text: str) -> str: - """Strip XML-like tags and return inner text for reasoning payloads.""" - - if not text: - return "" - - return re.sub(r"<[^>]+>", "", text).strip() - - def _has_reasoning_content(self, formatted_reasoning: str) -> bool: - """Determine whether the formatted reasoning contains substantive text.""" - - return bool(self._extract_reasoning_inner_text(formatted_reasoning)) - - def _prepare_reasoning_texts( - self, reasoning_output: str, backend: str - ) -> tuple[str, str]: - """Return backend-tagged reasoning and plain text representations.""" - - if not reasoning_output: - return "", "" - - opening_tag, closing_tag = get_reasoning_tags(backend) - tagged = self._apply_reasoning_tag_wrapping( - reasoning_output, opening_tag, closing_tag - ).strip() - - plain = self._extract_reasoning_inner_text(tagged) - if not plain: - return "", "" - - return tagged, plain - - def _format_reasoning_for_model(self, reasoning_output: str, backend: str) -> str: - """Format reasoning with model-specific tags. - - Args: - reasoning_output: Raw reasoning text - backend: Backend name for format selection - - Returns: - Formatted reasoning with appropriate tags - """ - tagged, plain = self._prepare_reasoning_texts(reasoning_output, backend) - return tagged if plain else "" - - def _inject_as_system_message( - self, messages: list, reasoning_output: str, execution_backend: str - ) -> list: - """Inject reasoning as system message. - - Args: - messages: Original message list - reasoning_output: Captured reasoning text - execution_backend: Backend name for tag formatting - - Returns: - Messages with reasoning in system message - """ - import copy - - messages_copy = copy.deepcopy(messages) - - # Format reasoning with appropriate tags - formatted_reasoning = self._format_reasoning_for_model( - reasoning_output, execution_backend - ) - if not formatted_reasoning: - return messages_copy - - # Create system message content - system_content = ( - "Consider this reasoning when formulating your response:\n\n" - f"{formatted_reasoning}" - ) - - # Check if there's already a system message - has_system_message = False - for i, msg in enumerate(messages_copy): - if isinstance(msg, dict) and msg.get("role") == "system": - # Augment existing system message - messages_copy[i]["content"] = f"{msg['content']}\n\n{system_content}" - has_system_message = True - break - - # If no system message exists, create one at the beginning - if not has_system_message: - system_message = {"role": "system", "content": system_content} - messages_copy.insert(0, system_message) - - return messages_copy - - def _inject_to_user_message( - self, messages: list, reasoning_output: str, execution_backend: str - ) -> list: - """Inject reasoning as prefix to first user message. - - Args: - messages: Original message list - reasoning_output: Captured reasoning text - execution_backend: Backend name for tag formatting - - Returns: - Messages with reasoning prepended to first user message - """ - import copy - - messages_copy = copy.deepcopy(messages) - - # Format reasoning with appropriate tags - formatted_reasoning = self._format_reasoning_for_model( - reasoning_output, execution_backend - ) - if not formatted_reasoning: - return messages_copy - - # Find first user message - for i, msg in enumerate(messages_copy): - if isinstance(msg, dict) and msg.get("role") == "user": - # Prepend reasoning to user message - original_content = msg.get("content", "") - messages_copy[i][ - "content" - ] = f"{formatted_reasoning}\n\n{original_content}" - break - - return messages_copy - - def _augment_messages( - self, messages: list, reasoning_output: str, execution_backend: str - ) -> list: - """Augment messages with reasoning using adaptive placement strategy. - - Args: - messages: Original message list - reasoning_output: Captured reasoning text - execution_backend: Backend name to determine capability - - Returns: - New message list with reasoning injected appropriately - """ - # Handle edge case: empty messages - if not messages: - logger.warning("Empty message list provided for augmentation") - return messages - - # Check if execution backend supports system messages - if self._supports_system_messages(execution_backend): - # Primary strategy: inject as system message - logger.debug(f"Using system message injection for {execution_backend}") - augmented_messages = self._inject_as_system_message( - messages, reasoning_output, execution_backend - ) - else: - # Fallback strategy: inject to user message - logger.debug(f"Using user message prefix injection for {execution_backend}") - augmented_messages = self._inject_to_user_message( - messages, reasoning_output, execution_backend - ) - - if self.config.backends.hybrid_backend_repeat_messages: - formatted_reasoning = self._format_reasoning_for_model( - reasoning_output, execution_backend - ) - if formatted_reasoning: - augmented_messages.append( - {"role": "assistant", "content": formatted_reasoning} - ) - return augmented_messages - - def _strip_reasoning_tags(self, content: str) -> str: - """Strip reasoning tags from content. - - Args: - content: Content that may contain reasoning tags - - Returns: - Content with reasoning tags and their content removed - """ - import re - - # Define all possible reasoning tag patterns - reasoning_patterns = [ - r".*?", - r".*?", - r".*?", - r".*?", - ] - - # Remove all reasoning tag patterns (case-insensitive, dotall for multiline) - cleaned_content = content - for pattern in reasoning_patterns: - cleaned_content = re.sub( - pattern, "", cleaned_content, flags=re.DOTALL | re.IGNORECASE - ) - - # Also remove the instruction prefix if present - instruction_pattern = ( - r"Consider this reasoning when formulating your response:\s*" - ) - cleaned_content = re.sub( - instruction_pattern, "", cleaned_content, flags=re.IGNORECASE - ) - - return cleaned_content - - def _filter_response_content(self, content: Any) -> Any: - """Filter reasoning tags from response content. - - This method handles various content types and ensures reasoning - tags are removed from all parts of the response, including tool calls. - - Args: - content: Response content (can be string, dict, or bytes) - - Returns: - Filtered content with reasoning tags removed - """ - - # Handle bytes content (SSE chunks) - if isinstance(content, bytes): - try: - content_str = content.decode("utf-8") - except UnicodeDecodeError: - # If we can't decode, return as-is - return content - elif isinstance(content, str): - content_str = content - elif isinstance(content, dict): - return self._filter_json_content(content) - elif isinstance(content, list): - return [self._filter_response_content(item) for item in content] - else: - # For other types, return as-is - return content - - # Check if this is an SSE data line - if content_str.startswith("data: "): - data_part = content_str[6:].strip() - - # Skip [DONE] markers - if data_part == "[DONE]": - return content - - try: - # Parse the JSON data - data = json.loads(data_part) - - # Filter the JSON payload recursively - cleaned = self._filter_json_content(data) - - # Reconstruct the SSE line - filtered_data = json.dumps(cleaned, ensure_ascii=False) - return ( - f"data: {filtered_data}\n\n".encode() - if isinstance(content, bytes) - else f"data: {filtered_data}\n\n" - ) - - except json.JSONDecodeError: - # If we can't parse JSON, just strip tags from the string - filtered_str = self._strip_reasoning_tags(content_str) - return ( - filtered_str.encode("utf-8") - if isinstance(content, bytes) - else filtered_str - ) - - # For non-SSE content, just strip tags - filtered_str = self._strip_reasoning_tags(content_str) - return ( - filtered_str.encode("utf-8") if isinstance(content, bytes) else filtered_str - ) - - def _filter_json_content(self, data: Any) -> Any: - """Recursively remove reasoning content from JSON-like structures.""" - - if isinstance(data, dict): - filtered: dict[str, Any] = {} - for key, value in data.items(): - if key == "reasoning_content": - continue - filtered[key] = self._filter_json_content(value) - return filtered - - if isinstance(data, list): - return [self._filter_json_content(item) for item in data] - - if isinstance(data, str): - return self._strip_reasoning_tags(data) - - return data - - async def _filter_response_stream( - self, response: StreamingResponseEnvelope - ) -> StreamingResponseEnvelope: - """Filter reasoning tags from streaming response. - - Args: - response: Original streaming response from execution model - - Returns: - Filtered streaming response with reasoning tags removed - """ - - async def filtered_stream(): - """Generator that filters each chunk of the response stream.""" - if response.content is None: - return - - async for chunk in response.content: - # Filter the content - filtered_content = self._filter_response_content(chunk.content) - - # Create new ProcessedResponse with filtered content - filtered_chunk = ProcessedResponse( - content=filtered_content, - usage=chunk.usage, - metadata=chunk.metadata, - ) - - yield filtered_chunk - - # Return new StreamingResponseEnvelope with filtered stream - return StreamingResponseEnvelope( - content=filtered_stream(), - media_type=response.media_type, - headers=response.headers, - cancel_callback=response.cancel_callback, - ) - - @staticmethod - def _truncate_after_reasoning_close(reasoning_output: str) -> str: - """Trim reasoning output so that only the thinking segment remains.""" - - closing_tags = ["", "", "", ""] - for tag in closing_tags: - index = reasoning_output.find(tag) - if index != -1: - return reasoning_output[: index + len(tag)] - return reasoning_output - - def _format_reasoning_for_client( - self, - reasoning_output: str, - reasoning_backend: str, - ) -> str: - """Prepare reasoning text for client consumption with native tags.""" - - if not reasoning_output: - return "" - - _, plain_text = self._prepare_reasoning_texts( - reasoning_output, reasoning_backend - ) - return plain_text - - def _build_reasoning_stream_chunk( - self, - reasoning_output: str, - reasoning_backend: str, - reasoning_model: str, - formatted_reasoning: str | None = None, - ) -> ProcessedResponse | None: - """Create a processed response chunk that surfaces reasoning to clients.""" - - formatted = ( - formatted_reasoning.strip() - if formatted_reasoning - and "<" in formatted_reasoning - and ">" in formatted_reasoning - else "" - ) - if not formatted: - formatted, plain_reasoning = self._prepare_reasoning_texts( - reasoning_output, reasoning_backend - ) - else: - plain_reasoning = self._extract_reasoning_inner_text(formatted) - - if formatted_reasoning and formatted_reasoning.strip() and not formatted: - # formatted_reasoning provided but lacked tags; reuse for plain text - plain_reasoning = formatted_reasoning.strip() - formatted, _ = self._prepare_reasoning_texts( - reasoning_output, reasoning_backend - ) - if not formatted: - formatted = formatted_reasoning.strip() - - if not plain_reasoning: - return None - - delta_payload: dict[str, Any] = { - "role": "assistant", - "reasoning": formatted, - "reasoning_content": plain_reasoning, - "content": "", - } - - payload = { - "id": f"hybrid-reasoning-{uuid.uuid4().hex}", - "object": "chat.completion.chunk", - "created": int(time.time()), - "model": f"{reasoning_backend}:{reasoning_model}", - "choices": [ - { - "index": 0, - "delta": delta_payload, - "finish_reason": None, - } - ], - } - - sse_payload = f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" - - return ProcessedResponse( - content=sse_payload, - usage=None, - metadata={ - "hybrid_phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - }, - ) - - def _build_tool_call_only_response( - self, - tool_calls: list[dict[str, Any]], - request_dict: dict[str, Any], - reasoning_backend: str, - reasoning_model: str, - ) -> ResponseEnvelope | StreamingResponseEnvelope: - """Construct a response that forwards tool calls without execution.""" - - stream_requested = bool(request_dict.get("stream", False)) - created_ts = int(time.time()) - model_name = f"{reasoning_backend}:{reasoning_model}" - - if stream_requested: - payload = { - "id": f"hybrid-tool-call-{uuid.uuid4().hex}", - "object": "chat.completion.chunk", - "created": created_ts, - "model": model_name, - "choices": [ - { - "index": 0, - "delta": { - "role": "assistant", - "content": "", - "tool_calls": tool_calls, - }, - "finish_reason": "tool_calls", - } - ], - } - sse_payload = f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" - done_payload = "data: [DONE]\n\n" - - async def tool_call_stream(): - yield ProcessedResponse( - content=sse_payload, - metadata={ - "hybrid_phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "skipped_execution": True, - }, - ) - yield ProcessedResponse( - content=done_payload, - metadata={"is_done": True}, - ) - - return StreamingResponseEnvelope( - content=tool_call_stream(), - media_type="text/event-stream", - ) - - response_content = { - "id": f"hybrid-tool-call-{uuid.uuid4().hex}", - "object": "chat.completion", - "created": created_ts, - "model": model_name, - "choices": [ - { - "index": 0, - "message": { - "role": "assistant", - "content": "", - "tool_calls": tool_calls, - }, - "finish_reason": "tool_calls", - } - ], - } - - return ResponseEnvelope( - content=response_content, - metadata={ - "hybrid_phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "skipped_execution": True, - }, - ) - - def _prepend_reasoning_chunk_to_stream( - self, - response: StreamingResponseEnvelope, - reasoning_output: str, - reasoning_backend: str, - reasoning_model: str, - formatted_reasoning: str | None = None, - ) -> StreamingResponseEnvelope: - """Inject the reasoning chunk ahead of the execution stream.""" - - reasoning_chunk = self._build_reasoning_stream_chunk( - reasoning_output, - reasoning_backend, - reasoning_model, - formatted_reasoning=formatted_reasoning, - ) - if reasoning_chunk is None: - return response - - original_stream = response.content - - async def combined_stream(): - yield reasoning_chunk - if original_stream is None: - return - async for chunk in original_stream: - yield chunk - - return StreamingResponseEnvelope( - content=combined_stream(), - media_type=response.media_type, - headers=response.headers, - cancel_callback=response.cancel_callback, - ) - - def _prepend_reasoning_to_non_streaming_content( - self, - content: Any, - reasoning_output: str, - reasoning_backend: str, - reasoning_model: str, - formatted_reasoning: str | None = None, - ) -> Any: - """Attach reasoning output to non-streaming responses.""" - - tagged, plain = self._prepare_reasoning_texts( - reasoning_output, reasoning_backend - ) - if formatted_reasoning: - candidate = formatted_reasoning.strip() - if "<" in candidate and ">" in candidate: - tagged = candidate - plain = self._extract_reasoning_inner_text(candidate) or plain - elif candidate: - plain = candidate - if not plain or not tagged: - return content - - if isinstance(content, bytes): - return content - - if isinstance(content, str): - return content - - if isinstance(content, dict): - updated = deepcopy(content) - choices = updated.get("choices") - if isinstance(choices, list): - for choice in choices: - if not isinstance(choice, dict): - continue - - message = choice.get("message") - if isinstance(message, dict): - if "role" not in message: - message["role"] = "assistant" - message["reasoning"] = tagged - message["reasoning_content"] = plain - continue - - delta = choice.get("delta") - if isinstance(delta, dict): - if "role" not in delta: - delta["role"] = "assistant" - delta["reasoning"] = tagged - delta["reasoning_content"] = plain - else: - metadata = updated.setdefault("metadata", {}) - if isinstance(metadata, dict): - metadata["reasoning"] = tagged - metadata["reasoning_content"] = plain - metadata.setdefault("reasoning_format", "hybrid_injected") - return updated - - return content - - async def _execute_reasoning_phase( - self, - messages: list, - reasoning_backend: str, - reasoning_model: str, - request_data: DomainModel | InternalDTO | dict[str, Any], - identity: IAppIdentityConfig | None, - uri_params: dict[str, Any] | None = None, - ) -> ReasoningPhaseResult: - """Execute reasoning phase and capture output and metadata. - - Args: - messages: Original message history - reasoning_backend: Backend name for reasoning model - reasoning_model: Model name for reasoning - request_data: Original request data - identity: Optional identity configuration - - Returns: - Structured result containing reasoning text, tool calls, and stream metadata - - Raises: - BackendError: If reasoning model call fails (HTTP 502) - """ - import asyncio - - logger.info( - f"Starting reasoning phase with {reasoning_backend}:{reasoning_model}" - ) - - # Resolve reasoning backend connector from registry - if self._backend_registry is None: - logger.error("Backend registry not initialized for reasoning phase") - raise BackendError( - message="Backend registry not initialized", - code="backend_registry_not_initialized", - details={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - }, - ) - - # Create request payload for reasoning model - reasoning_preset_params = get_reasoning_params(reasoning_backend) - reasoning_request = self._apply_reasoning_params( - request_data, reasoning_preset_params - ) - - # Apply URI parameters if provided - if uri_params: - try: - # Validate URI parameters before applying - from src.core.services.uri_parameter_validator import ( - URIParameterValidator, - ) - - validator = URIParameterValidator() - normalized_params, validation_errors = validator.validate_and_normalize( - uri_params - ) - - if validation_errors: - logger.warning( - f"URI parameter validation errors for reasoning phase ({reasoning_backend}:{reasoning_model}): " - f"{', '.join(validation_errors)}. Invalid parameters will be excluded." - ) - - # Apply normalized parameters - if normalized_params: - reasoning_request = self._apply_parameter_overrides( - reasoning_request, normalized_params - ) - except Exception as param_error: - # Log error but continue without URI parameters - logger.warning( - f"Failed to apply URI parameters for reasoning phase ({reasoning_backend}:{reasoning_model}): " - f"{param_error}. Continuing without URI parameters." - ) - - # Prepare canonical request for backend service - # Use full backend:model format so backend_service can properly resolve it - canonical_reasoning_request = self._prepare_backend_request( - reasoning_request, - target_model=f"{reasoning_backend}:{reasoning_model}", - stream=True, - messages=messages, - ) - - # DEBUG: Log what we're sending - extra_body = getattr(canonical_reasoning_request, "extra_body", None) - if isinstance(extra_body, dict): - extra_body_keys = ", ".join(sorted(map(str, extra_body.keys()))) - elif extra_body is None: - extra_body_keys = "None" - else: - extra_body_keys = f"" - - logger.error( - "[HYBRID DEBUG] Prepared reasoning request: model=%s, extra_body_keys=%s", - canonical_reasoning_request.model, - extra_body_keys, - ) - - try: - from src.core.di.services import get_required_service - from src.core.domain.request_context import ( - RequestContext, - RequestCookies, - RequestHeaders, - ) - from src.core.services.backend_service import BackendService - - backend_service = get_required_service(BackendService) - - # Create a clean context without session to prevent session backend inheritance - # This ensures the reasoning backend is resolved from the model name, not session state - clean_context = RequestContext( - headers=RequestHeaders(), - cookies=RequestCookies(), - state=None, - app_state=None, - session_id=None, # No session to prevent backend inheritance - ) - - # Call reasoning model with timeout via backend service - try: - response = await asyncio.wait_for( - backend_service.call_completion( - canonical_reasoning_request, - stream=True, - allow_failover=False, - context=clean_context, # Prefer context-enabled call - ), - timeout=REASONING_PHASE_TIMEOUT, - ) - except TypeError as exc: - # Integration stubs may not accept context; retry without it. - if "context" not in str(exc): - raise - response = await asyncio.wait_for( - backend_service.call_completion( - canonical_reasoning_request, - stream=True, - allow_failover=False, - ), - timeout=REASONING_PHASE_TIMEOUT, - ) - - # Extract stream from response - response_media_type = getattr(response, "media_type", None) - response_headers = getattr(response, "headers", None) - - if isinstance(response, StreamingResponseEnvelope) and response.content: - stream = response.content - else: - logger.error( - "Reasoning model did not return streaming response", - extra={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "response_type": type(response).__name__, - }, - ) - raise BackendError( - message="Reasoning model did not return streaming response", - code="reasoning_no_stream", - details={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "response_type": type(response).__name__, - }, - ) - - # Use ReasoningStreamProcessor to capture reasoning output - processor = ReasoningStreamProcessor() - reasoning_text, reasoning_complete, metadata = ( - await processor.capture_reasoning_stream(stream) - ) - tool_calls = metadata.get("tool_calls") or [] - raw_chunks = metadata.get("raw_chunks") or [] - - # Cancel the stream if it has a cancel callback - if hasattr(response, "cancel_callback") and response.cancel_callback: - try: - await response.cancel_callback() - logger.debug( - "Reasoning stream cancelled successfully", - extra={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - }, - ) - except Exception as e: - logger.debug( - f"Stream cancellation failed (non-fatal): {e}", - extra={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "error": str(e), - }, - ) - - logger.info( - f"Reasoning phase complete: {len(reasoning_text)} chars captured, " - f"method={metadata.get('method')}, " - f"chunks={metadata.get('chunks_processed')}" - ) - - return ReasoningPhaseResult( - text=reasoning_text, - complete=reasoning_complete, - tool_calls=tool_calls, - raw_chunks=raw_chunks, - media_type=response_media_type, - headers=response_headers, - ) - - except ServiceResolutionError as e: - logger.error( - "Failed to resolve BackendService for reasoning phase", - extra={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "error": str(e), - }, - exc_info=True, - ) - raise BackendError( - message=f"Failed to initialize reasoning backend: {e}", - code="reasoning_backend_init_failed", - details={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - }, - ) from e - - except asyncio.TimeoutError as e: - # Handle timeout with partial reasoning fallback - logger.warning( - f"Reasoning phase timeout after {REASONING_PHASE_TIMEOUT}s, " - f"attempting to use partial reasoning output", - extra={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "timeout_seconds": REASONING_PHASE_TIMEOUT, - }, - ) - - # If we have partial reasoning output from the processor, use it - # Otherwise, raise the error - raise BackendError( - message=f"Reasoning phase timeout after {REASONING_PHASE_TIMEOUT}s", - code="reasoning_timeout", - details={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "timeout_seconds": REASONING_PHASE_TIMEOUT, - }, - ) from e - except BackendError: - # Re-raise BackendError as-is (already has proper context) - raise - except Exception as e: - logger.error( - f"Reasoning phase failed with unexpected error: {type(e).__name__}", - extra={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "error_type": type(e).__name__, - "error": str(e), - }, - exc_info=True, - ) - raise BackendError( - message=f"Reasoning phase failed: {e}", - code="reasoning_phase_failed", - details={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "error_type": type(e).__name__, - }, - ) from e - - def _prepare_backend_request( - self, - request_data: DomainModel | InternalDTO | dict[str, Any], - target_model: str, - stream: bool, - messages: list | None = None, - ) -> CanonicalChatRequest: - """Normalize request for backend service calls.""" - - from src.core.domain.chat import CanonicalChatRequest, ChatRequest - - request_obj: Any = request_data - - if hasattr(request_obj, "model_copy"): - request_obj = request_obj.model_copy( - update={"model": target_model, "stream": stream} - ) - elif isinstance(request_obj, dict): - request_dict = dict(request_obj) - request_dict["model"] = target_model - request_dict["stream"] = stream - request_obj = self.translation_service.to_domain_request( - request_dict, "openai" - ) - elif is_dataclass(request_obj) and not isinstance(request_obj, type): - request_dict = asdict(request_obj) - request_dict["model"] = target_model - request_dict["stream"] = stream - request_obj = self.translation_service.to_domain_request( - request_dict, "openai" - ) - elif isinstance(request_obj, ChatRequest): - request_obj = request_obj.model_copy( - update={"model": target_model, "stream": stream} - ) - else: - raise TypeError( - "Unable to prepare backend request from type " - f"{type(request_obj).__name__}" - ) - - if not isinstance(request_obj, CanonicalChatRequest): - request_obj = self.translation_service.to_domain_request( - request_obj, "openai" - ) - - if messages is not None: - request_obj = request_obj.model_copy(update={"messages": messages}) - - # Remove session_id from extra_body to prevent session backend inheritance - # This ensures the backend is resolved from the model name, not session state - if request_obj.extra_body and isinstance(request_obj.extra_body, dict): - keys_to_strip = {"session_id", "backend_type", "model"} - cleaned_extra_body = { - k: v - for k, v in request_obj.extra_body.items() - if k not in keys_to_strip - } - if len(cleaned_extra_body) != len(request_obj.extra_body): - request_obj = request_obj.model_copy( - update={ - "extra_body": cleaned_extra_body if cleaned_extra_body else None - } - ) - - return cast("CanonicalChatRequest", request_obj) - - async def _execute_execution_phase( - self, - request_data: DomainModel | InternalDTO | dict[str, Any], - augmented_messages: list, - execution_backend: str, - execution_model: str, - identity: IAppIdentityConfig | None, - reasoning_output_length: int = 0, - uri_params: dict[str, Any] | None = None, - **kwargs: Any, - ) -> ResponseEnvelope | StreamingResponseEnvelope: - """Execute execution phase with augmented messages. - - Args: - request_data: Original request data - augmented_messages: Messages with reasoning appended - execution_backend: Backend name for execution - execution_model: Model name for execution - identity: Optional identity configuration - reasoning_output_length: Length of reasoning output for logging - **kwargs: Additional arguments - - Returns: - Response from execution model - - Raises: - BackendError: If execution model call fails (HTTP 502) - """ - logger.info( - f"Starting execution phase with {execution_backend}:{execution_model}", - extra={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": reasoning_output_length, - }, - ) - - # Resolve execution backend connector from registry - if self._backend_registry is None: - logger.error("Backend registry not initialized for execution phase") - raise BackendError( - message="Backend registry not initialized", - code="backend_registry_not_initialized", - details={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": reasoning_output_length, - }, - ) - - try: - # Use backend factory to properly create and initialize the backend - from src.core.di.services import get_required_service - from src.core.services.backend_factory import BackendFactory - - backend_factory_instance = get_required_service(BackendFactory) - - # Get backend config for execution backend - execution_backend_config = None - if hasattr(self.config, "backends"): - with contextlib.suppress(AttributeError): - execution_backend_config = getattr( - self.config.backends, execution_backend - ) - - execution_identity = self._resolve_backend_identity( - execution_backend, identity, execution_backend_config - ) - - # Use ensure_backend which properly handles API key initialization - execution_connector = await backend_factory_instance.ensure_backend( - execution_backend, self.config, execution_backend_config - ) - - except ValueError as e: - logger.error( - f"Execution backend '{execution_backend}' not found in registry", - extra={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": reasoning_output_length, - "error": str(e), - }, - ) - raise BackendError( - message=f"Execution backend '{execution_backend}' not found: {e}", - code="execution_backend_not_found", - details={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": reasoning_output_length, - }, - ) from e - except Exception as e: - logger.error( - f"Failed to initialize execution backend '{execution_backend}'", - extra={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": reasoning_output_length, - "error": str(e), - }, - exc_info=True, - ) - raise BackendError( - message=f"Failed to initialize execution backend: {e}", - code="execution_backend_init_failed", - details={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": reasoning_output_length, - }, - ) from e - - # Create request payload with augmented messages - execution_preset_params = get_execution_params(execution_backend) - execution_request = self._apply_reasoning_params( - request_data, execution_preset_params - ) - - # Apply URI parameters if provided - if uri_params: - try: - # Validate URI parameters before applying - from src.core.services.uri_parameter_validator import ( - URIParameterValidator, - ) - - validator = URIParameterValidator() - normalized_params, validation_errors = validator.validate_and_normalize( - uri_params - ) - - if validation_errors: - logger.warning( - f"URI parameter validation errors for execution phase ({execution_backend}:{execution_model}): " - f"{', '.join(validation_errors)}. Invalid parameters will be excluded." - ) - - # Apply normalized parameters - if normalized_params: - execution_request = self._apply_parameter_overrides( - execution_request, normalized_params - ) - except Exception as param_error: - # Log error but continue without URI parameters - logger.warning( - f"Failed to apply URI parameters for execution phase ({execution_backend}:{execution_model}): " - f"{param_error}. Continuing without URI parameters." - ) - - try: - # Call execution model with augmented messages and timeout - response = await asyncio.wait_for( - execution_connector.chat_completions( - request_data=execution_request, - processed_messages=augmented_messages, - effective_model=execution_model, - identity=execution_identity, - **kwargs, - ), - timeout=EXECUTION_PHASE_TIMEOUT, - ) - - logger.info( - "Execution phase complete", - extra={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - }, - ) - - return response - - except asyncio.TimeoutError as e: - # Handle execution timeout - logger.error( - f"Execution phase timeout after {EXECUTION_PHASE_TIMEOUT}s", - extra={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": reasoning_output_length, - "timeout_seconds": EXECUTION_PHASE_TIMEOUT, - }, - ) - raise BackendError( - message=f"Execution phase timeout after {EXECUTION_PHASE_TIMEOUT}s", - code="execution_timeout", - details={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": reasoning_output_length, - "timeout_seconds": EXECUTION_PHASE_TIMEOUT, - }, - ) from e - - except BackendError: - # Re-raise BackendError as-is (already has proper context) - raise - except AuthenticationError: - # Re-raise AuthenticationError as-is (already has proper context) - raise - except Exception as e: - logger.error( - f"Execution phase failed with unexpected error: {type(e).__name__}", - extra={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": reasoning_output_length, - "error_type": type(e).__name__, - "error": str(e), - }, - exc_info=True, - ) - raise BackendError( - message=f"Execution phase failed: {e}", - code="execution_phase_failed", - details={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": reasoning_output_length, - "error_type": type(e).__name__, - }, - ) from e - - @staticmethod - def _extract_message_role(message: Any) -> str | None: - """Best-effort extraction of a message role.""" - - role = getattr(message, "role", None) - if isinstance(role, str) and role: - return role - - if isinstance(message, dict): - role_value = message.get("role") - return role_value if isinstance(role_value, str) else None - - if hasattr(message, "model_dump") and callable(message.model_dump): - try: - dumped = message.model_dump() - role_value = dumped.get("role") - if isinstance(role_value, str): - return role_value - except Exception: - return None - - if hasattr(message, "get") and callable(message.get): - try: - role_value = message.get("role") - if isinstance(role_value, str): - return role_value - except Exception: - return None - - return None - - def _is_first_user_turn( - self, - processed_messages: list[Any] | None, - request_messages: list[Any] | None, - ) -> bool: - """Determine whether the current request represents the first user turn.""" - - messages_to_check: list[Any] = [] - if processed_messages: - messages_to_check = list(processed_messages) - elif request_messages: - messages_to_check = list(request_messages) - - if not messages_to_check: - # No prior context available; treat as first turn to err on the side of reasoning. - return True - - for message in messages_to_check: - role = self._extract_message_role(message) - if not role: - continue - normalized_role = role.strip().lower() - if normalized_role in {"assistant", "tool", "function"}: - return False - - return True - - async def chat_completions( - self, - request_data: DomainModel | InternalDTO | dict[str, Any], - processed_messages: list, - effective_model: str, - identity: IAppIdentityConfig | None = None, - **kwargs: Any, - ) -> ResponseEnvelope | StreamingResponseEnvelope: - """Execute the two-phase hybrid completion. - - Args: - request_data: Original request - processed_messages: Messages after command processing - effective_model: Format "hybrid:[backend:model,backend:model]" - identity: Optional identity configuration - **kwargs: Additional arguments - - Returns: - StreamingResponseEnvelope with execution model's response - - Raises: - ValueError: If model specification is invalid - ConfigurationError: If hybrid backend is disabled - BackendError: If either phase fails (HTTP 502) - """ - start_time = time.time() - - # Extract session ID for logging if available - session_id = None - if identity and hasattr(identity, "session_id"): - session_id = identity.session_id - - # Check if hybrid backend is disabled - if ( - hasattr(self.config, "backends") - and hasattr(self.config.backends, "disable_hybrid_backend") - and self.config.backends.disable_hybrid_backend - ): - logger.warning( - "Hybrid backend request rejected - backend is disabled", - extra={"session_id": session_id}, - ) - raise ConfigurationError( - message="Hybrid backend is disabled", - code="hybrid_backend_disabled", - ) - - # Convert request_data to dict if needed - if isinstance(request_data, DomainModel): - request_dict = request_data.model_dump() - elif isinstance(request_data, dict): - request_dict = request_data - elif is_dataclass(request_data) and not isinstance(request_data, type): - request_dict = asdict(request_data) - else: - raise TypeError( - "request_data must be a Pydantic model, a dict, or a dataclass, " - f"but got {type(request_data)}" - ) - - try: - # Parse hybrid model specification with URI parameters - ( - reasoning_backend, - reasoning_model, - reasoning_params, - execution_backend, - execution_model, - execution_params, - ) = self._parse_hybrid_model_spec(effective_model) - - # Check for reasoning_effort parameter and log warning - has_reasoning_effort_in_reasoning = "reasoning_effort" in reasoning_params - has_reasoning_effort_in_execution = "reasoning_effort" in execution_params - - if has_reasoning_effort_in_reasoning or has_reasoning_effort_in_execution: - logger.warning( - "reasoning_effort parameter in hybrid model string is not effective. " - "Hybrid backend enforces reasoning effort by design.", - extra={ - "session_id": session_id, - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_effort_in_reasoning": has_reasoning_effort_in_reasoning, - "reasoning_effort_in_execution": has_reasoning_effort_in_execution, - }, - ) - - # Remove reasoning_effort from parameters to prevent it from being applied - if has_reasoning_effort_in_reasoning: - reasoning_params = { - k: v - for k, v in reasoning_params.items() - if k != "reasoning_effort" - } - if has_reasoning_effort_in_execution: - execution_params = { - k: v - for k, v in execution_params.items() - if k != "reasoning_effort" - } - - # Log hybrid request initiation with session and model details - logger.info( - f"Hybrid request initiated: reasoning={reasoning_backend}:{reasoning_model} (params={reasoning_params}), " - f"execution={execution_backend}:{execution_model} (params={execution_params})", - extra={ - "session_id": session_id, - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "reasoning_params": reasoning_params, - "execution_backend": execution_backend, - "execution_model": execution_model, - "execution_params": execution_params, - "message_count": len(processed_messages), - "stream": request_dict.get("stream", False), - }, - ) - - except ValueError as e: - logger.error( - f"Invalid hybrid model specification: {e}", - extra={ - "session_id": session_id, - "effective_model": effective_model, - "error": str(e), - }, - ) - raise - - reasoning_output = "" - client_reasoning = "" - has_reasoning_content = False - reasoning_time = 0.0 - - # Check for temporary reasoning injection probability override from edit precision middleware - temp_reasoning_probability = None - if isinstance(request_data, dict): - extra_body = request_data.get("extra_body", {}) - else: - extra_body = getattr(request_data, "extra_body", {}) - if extra_body is None: - extra_body = {} - - # Check if edit precision middleware has set a temporary override - temp_prob_override = extra_body.get("_temp_hybrid_reasoning_probability") - if temp_prob_override is not None: - temp_reasoning_probability = float(temp_prob_override) - # Log that we're using a temporary override - logger.info( - f"Using temporary reasoning injection probability override: {temp_reasoning_probability} for session", - extra={"session_id": session_id}, - ) - else: - temp_reasoning_probability = ( - self.config.backends.reasoning_injection_probability - ) - - raw_request_messages = request_dict.get("messages") - request_messages: list[Any] | None = None - if isinstance(raw_request_messages, list): - request_messages = raw_request_messages - - is_first_turn = self._is_first_user_turn( - processed_messages=processed_messages, request_messages=request_messages - ) - - if is_first_turn: - use_reasoning = True - logger.info( - "Reasoning model injection decision: FORCE (first user turn), probability=%s", - temp_reasoning_probability, - ) - else: - random_draw = random.random() - use_reasoning = random_draw < temp_reasoning_probability - logger.info( - "Reasoning model injection decision: %s, probability=%s, draw=%.4f", - "USE" if use_reasoning else "SKIP", - temp_reasoning_probability, - random_draw, - ) - - if use_reasoning: - try: - # Phase 1: Execute reasoning phase and capture output - reasoning_result = await self._execute_reasoning_phase( - messages=processed_messages, - reasoning_backend=reasoning_backend, - reasoning_model=reasoning_model, - request_data=request_data, # Pass original request_data, not dict - identity=identity, - uri_params=reasoning_params, - ) - reasoning_output = reasoning_result.text - - reasoning_time = time.time() - start_time - client_reasoning = self._format_reasoning_for_client( - reasoning_output, reasoning_backend - ) - has_reasoning_content = self._has_reasoning_content(client_reasoning) - if not has_reasoning_content: - client_reasoning = "" - - # Log reasoning phase completion with output length and duration - logger.info( - f"Reasoning phase completed: {len(reasoning_output)} chars captured in {reasoning_time:.2f}s", - extra={ - "session_id": session_id, - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "output_length": len(reasoning_output), - "duration_seconds": reasoning_time, - "tool_call_count": len(reasoning_result.tool_calls), - }, - ) - - skip_execution_due_to_tool_call = ( - not has_reasoning_content and reasoning_result.has_tool_calls() - ) - if skip_execution_due_to_tool_call: - logger.info( - "[hybrid-backend] Skipping call to the execution model as reasoning model produced no reasoning output but a tool call", - extra={ - "session_id": session_id, - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "tool_call_count": len(reasoning_result.tool_calls), - }, - ) - return self._build_tool_call_only_response( - tool_calls=reasoning_result.tool_calls, - request_dict=request_dict, - reasoning_backend=reasoning_backend, - reasoning_model=reasoning_model, - ) - - except BackendError as e: - logger.error( - f"Reasoning phase failed: {e.message}", - extra={ - "session_id": session_id, - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "error_code": e.code, - "error": e.message, - }, - ) - raise BackendError( - message=f"Hybrid backend error (reasoning phase): {e.message}", - code="hybrid_reasoning_failed", - details={ - "phase": "reasoning", - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "original_error": e.code, - }, - ) from e - - try: - # Phase 2: Augment messages with reasoning output - augmented_messages = self._augment_messages( - messages=processed_messages, - reasoning_output=reasoning_output, - execution_backend=execution_backend, - ) - - logger.debug( - f"Messages augmented: {len(processed_messages)} -> {len(augmented_messages)} messages", - extra={ - "session_id": session_id, - "original_message_count": len(processed_messages), - "augmented_message_count": len(augmented_messages), - "reasoning_output_length": len(reasoning_output), - }, - ) - - except Exception as e: - logger.error( - f"Message augmentation failed: {e}", - extra={ - "session_id": session_id, - "execution_backend": execution_backend, - "reasoning_output_length": len(reasoning_output), - "error": str(e), - }, - exc_info=True, - ) - raise BackendError( - message=f"Failed to augment messages with reasoning: {e}", - code="hybrid_augmentation_failed", - details={ - "execution_backend": execution_backend, - "reasoning_output_length": len(reasoning_output), - }, - ) from e - - try: - # Phase 3: Execute execution phase with augmented messages - response = await self._execute_execution_phase( - request_data=request_data, # Pass original request_data, not dict - augmented_messages=augmented_messages, - execution_backend=execution_backend, - execution_model=execution_model, - identity=identity, - reasoning_output_length=len(reasoning_output), - uri_params=execution_params, - **kwargs, - ) - - # Phase 4: Filter reasoning tags from response - if isinstance(response, StreamingResponseEnvelope): - if logger.isEnabledFor(TRACE_LEVEL): - logger.log( - TRACE_LEVEL, - "Filtering reasoning tags from streaming response", - extra={ - "session_id": session_id, - "execution_backend": execution_backend, - "execution_model": execution_model, - }, - ) - response = await self._filter_response_stream(response) - response = self._prepend_reasoning_chunk_to_stream( - response, - reasoning_output, - reasoning_backend, - reasoning_model, - formatted_reasoning=client_reasoning, - ) - elif isinstance(response, ResponseEnvelope): - logger.debug( - "Filtering reasoning tags from non-streaming response", - extra={ - "session_id": session_id, - "execution_backend": execution_backend, - "execution_model": execution_model, - }, - ) - filtered_content = self._filter_response_content(response.content) - response.content = self._prepend_reasoning_to_non_streaming_content( - filtered_content, - reasoning_output, - reasoning_backend, - reasoning_model, - formatted_reasoning=client_reasoning, - ) - if client_reasoning: - if response.metadata is None: - response.metadata = {} - response.metadata.setdefault("reasoning", client_reasoning) - response.metadata.setdefault("reasoning_format", "hybrid_injected") - response.metadata.setdefault("reasoning_backend", reasoning_backend) - response.metadata.setdefault("reasoning_model", reasoning_model) - - total_time = time.time() - start_time - execution_time = total_time - reasoning_time - - # Log execution phase completion with total duration - logger.info( - f"Hybrid request completed: total={total_time:.2f}s " - f"(reasoning={reasoning_time:.2f}s, execution={execution_time:.2f}s)", - extra={ - "session_id": session_id, - "reasoning_backend": reasoning_backend, - "reasoning_model": reasoning_model, - "execution_backend": execution_backend, - "execution_model": execution_model, - "total_duration_seconds": total_time, - "reasoning_duration_seconds": reasoning_time, - "execution_duration_seconds": execution_time, - "reasoning_output_length": len(reasoning_output), - }, - ) - - return response - - except BackendError as e: - logger.error( - f"Execution phase failed: {e.message}", - extra={ - "session_id": session_id, - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": len(reasoning_output), - "error_code": e.code, - "error": e.message, - }, - ) - raise BackendError( - message=f"Hybrid backend error (execution phase): {e.message}", - code="hybrid_execution_failed", - details={ - "phase": "execution", - "execution_backend": execution_backend, - "execution_model": execution_model, - "reasoning_output_length": len(reasoning_output), - "original_error": e.code, - }, - ) from e - - -# Register the hybrid backend -backend_registry.register_backend("hybrid", HybridConnector) diff --git a/src/connectors/hybrid/__init__.py b/src/connectors/hybrid/__init__.py new file mode 100644 index 00000000..93dc0535 --- /dev/null +++ b/src/connectors/hybrid/__init__.py @@ -0,0 +1,12 @@ +"""Hybrid backend connector package.""" + +from __future__ import annotations + +from src.core.services.backend_registry import backend_registry + +from .connector import HybridConnector, logger +from .types import ReasoningPhaseResult + +backend_registry.register_backend("hybrid", HybridConnector) + +__all__ = ["HybridConnector", "ReasoningPhaseResult", "logger"] diff --git a/src/connectors/hybrid/connector.py b/src/connectors/hybrid/connector.py new file mode 100644 index 00000000..5c463be2 --- /dev/null +++ b/src/connectors/hybrid/connector.py @@ -0,0 +1,494 @@ +"""Hybrid backend connector implementation.""" + +from __future__ import annotations + +import logging +import random +import time +from dataclasses import asdict, is_dataclass +from typing import TYPE_CHECKING, Any + +import httpx + +from src.connectors.base import LLMBackend +from src.core.app.constants.logging_constants import TRACE_LEVEL +from src.core.common.exceptions import ( + AuthenticationError, + BackendError, + ConfigurationError, +) +from src.core.config.app_config import AppConfig +from src.core.domain.responses import ResponseEnvelope, StreamingResponseEnvelope +from src.core.interfaces.configuration_interface import IAppIdentityConfig +from src.core.interfaces.model_bases import DomainModel, InternalDTO + +from .message_augmentation import HybridMessageAugmentationMixin +from .model_spec import HybridModelSpecMixin +from .parameters import HybridParameterMixin +from .phases import HybridPhaseMixin +from .reasoning_markup import HybridReasoningMarkupMixin +from .request_preparation import HybridRequestPreparationMixin +from .response_filtering import HybridResponseFilteringMixin +from .utils import HybridConnectorUtilsMixin + +if TYPE_CHECKING: + from src.core.services.backend_registry import BackendRegistry + from src.core.services.translation_service import TranslationService + +logger = logging.getLogger(__name__) + + +class HybridConnector( + HybridModelSpecMixin, + HybridParameterMixin, + HybridReasoningMarkupMixin, + HybridMessageAugmentationMixin, + HybridResponseFilteringMixin, + HybridRequestPreparationMixin, + HybridPhaseMixin, + HybridConnectorUtilsMixin, + LLMBackend, +): + """LLMBackend implementation for hybrid two-phase reasoning approach.""" + + backend_type: str = "hybrid" + + def __init__( + self, + client: httpx.AsyncClient, + config: AppConfig, + translation_service: TranslationService, + backend_registry: BackendRegistry | None = None, + ) -> None: + super().__init__(config=config) + self.client = client + self.config = config + self.translation_service = translation_service + self._backend_registry = backend_registry + + async def initialize(self, **kwargs: Any) -> None: + """Initialize the hybrid backend.""" + + if self._backend_registry is None: + from src.core.services.backend_registry import ( + backend_registry as default_registry, + ) + + self._backend_registry = default_registry + + if ( + hasattr(self.config, "backends") + and hasattr(self.config.backends, "disable_hybrid_backend") + and self.config.backends.disable_hybrid_backend + ): + logger.warning("Hybrid backend is disabled in configuration") + + logger.info("Hybrid backend initialized successfully") + + async def chat_completions( + self, + request_data: DomainModel | InternalDTO | dict[str, Any], + processed_messages: list, + effective_model: str, + identity: IAppIdentityConfig | None = None, + **kwargs: Any, + ) -> ResponseEnvelope | StreamingResponseEnvelope: + """Execute the two-phase hybrid completion.""" + + start_time = time.time() + session_id = getattr(identity, "session_id", None) if identity else None + + if ( + hasattr(self.config, "backends") + and hasattr(self.config.backends, "disable_hybrid_backend") + and self.config.backends.disable_hybrid_backend + ): + logger.warning( + "Hybrid backend request rejected - backend is disabled", + extra={"session_id": session_id}, + ) + raise ConfigurationError( + message="Hybrid backend is disabled", + code="hybrid_backend_disabled", + ) + + if isinstance(request_data, DomainModel): + request_dict = request_data.model_dump() + elif isinstance(request_data, dict): + request_dict = request_data + elif is_dataclass(request_data) and not isinstance(request_data, type): + request_dict = asdict(request_data) + else: + raise TypeError( + "request_data must be a Pydantic model, a dict, or a dataclass, " + f"but got {type(request_data)}" + ) + + try: + ( + reasoning_backend, + reasoning_model, + reasoning_params, + execution_backend, + execution_model, + execution_params, + ) = self._parse_hybrid_model_spec(effective_model) + + has_reasoning_effort_in_reasoning = "reasoning_effort" in reasoning_params + has_reasoning_effort_in_execution = "reasoning_effort" in execution_params + + if has_reasoning_effort_in_reasoning or has_reasoning_effort_in_execution: + logger.warning( + "reasoning_effort parameter in hybrid model string is not effective. " + "Hybrid backend enforces reasoning effort by design.", + extra={ + "session_id": session_id, + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_effort_in_reasoning": has_reasoning_effort_in_reasoning, + "reasoning_effort_in_execution": has_reasoning_effort_in_execution, + }, + ) + + if has_reasoning_effort_in_reasoning: + reasoning_params = { + key: value + for key, value in reasoning_params.items() + if key != "reasoning_effort" + } + if has_reasoning_effort_in_execution: + execution_params = { + key: value + for key, value in execution_params.items() + if key != "reasoning_effort" + } + + logger.info( + "Hybrid request initiated: reasoning=%s:%s (params=%s), execution=%s:%s (params=%s)", + reasoning_backend, + reasoning_model, + reasoning_params, + execution_backend, + execution_model, + execution_params, + extra={ + "session_id": session_id, + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "reasoning_params": reasoning_params, + "execution_backend": execution_backend, + "execution_model": execution_model, + "execution_params": execution_params, + "message_count": len(processed_messages), + "stream": request_dict.get("stream", False), + }, + ) + + except ValueError as exc: + logger.error( + "Invalid hybrid model specification: %s", + exc, + extra={ + "session_id": session_id, + "effective_model": effective_model, + "error": str(exc), + }, + ) + raise + + reasoning_output = "" + client_reasoning = "" + has_reasoning_content = False + reasoning_time = 0.0 + + if isinstance(request_data, dict): + extra_body = request_data.get("extra_body", {}) + else: + extra_body = getattr(request_data, "extra_body", {}) or {} + + temp_prob_override = extra_body.get("_temp_hybrid_reasoning_probability") + if temp_prob_override is not None: + temp_reasoning_probability = float(temp_prob_override) + logger.info( + "Using temporary reasoning injection probability override: %s for session", + temp_reasoning_probability, + extra={"session_id": session_id}, + ) + else: + temp_reasoning_probability = ( + self.config.backends.reasoning_injection_probability + ) + + raw_request_messages = request_dict.get("messages") + request_messages: list[Any] | None = None + if isinstance(raw_request_messages, list): + request_messages = raw_request_messages + + is_first_turn = self._is_first_user_turn( + processed_messages=processed_messages, request_messages=request_messages + ) + + if is_first_turn: + use_reasoning = True + logger.info( + "Reasoning model injection decision: FORCE (first user turn), probability=%s", + temp_reasoning_probability, + ) + else: + random_draw = random.random() + use_reasoning = random_draw < temp_reasoning_probability + logger.info( + "Reasoning model injection decision: %s, probability=%s, draw=%.4f", + "USE" if use_reasoning else "SKIP", + temp_reasoning_probability, + random_draw, + ) + + if use_reasoning: + try: + reasoning_result = await self._execute_reasoning_phase( + messages=processed_messages, + reasoning_backend=reasoning_backend, + reasoning_model=reasoning_model, + request_data=request_data, + identity=identity, + uri_params=reasoning_params, + ) + reasoning_output = reasoning_result.text + + reasoning_time = time.time() - start_time + client_reasoning = self._format_reasoning_for_client( + reasoning_output, reasoning_backend + ) + has_reasoning_content = self._has_reasoning_content(client_reasoning) + if not has_reasoning_content: + client_reasoning = "" + + logger.info( + "Reasoning phase completed: %s chars captured in %.2fs", + len(reasoning_output), + reasoning_time, + extra={ + "session_id": session_id, + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "output_length": len(reasoning_output), + "duration_seconds": reasoning_time, + "tool_call_count": len(reasoning_result.tool_calls), + }, + ) + + skip_execution_due_to_tool_call = ( + not has_reasoning_content and reasoning_result.has_tool_calls() + ) + if skip_execution_due_to_tool_call: + logger.info( + "[hybrid-backend] Skipping call to the execution model as reasoning model produced no reasoning output but a tool call", + extra={ + "session_id": session_id, + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "tool_call_count": len(reasoning_result.tool_calls), + }, + ) + return self._build_tool_call_only_response( + tool_calls=reasoning_result.tool_calls, + request_dict=request_dict, + reasoning_backend=reasoning_backend, + reasoning_model=reasoning_model, + ) + + except BackendError as exc: + logger.error( + "Reasoning phase failed: %s", + exc.message, + extra={ + "session_id": session_id, + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "error_code": exc.code, + "error": exc.message, + }, + ) + raise BackendError( + message=f"Hybrid backend error (reasoning phase): {exc.message}", + code="hybrid_reasoning_failed", + details={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "original_error": exc.code, + }, + ) from exc + + try: + augmented_messages = self._augment_messages( + messages=processed_messages, + reasoning_output=reasoning_output, + execution_backend=execution_backend, + ) + + logger.debug( + "Messages augmented: %s -> %s messages", + len(processed_messages), + len(augmented_messages), + extra={ + "session_id": session_id, + "original_message_count": len(processed_messages), + "augmented_message_count": len(augmented_messages), + "reasoning_output_length": len(reasoning_output), + }, + ) + + except Exception as exc: + logger.error( + "Message augmentation failed: %s", + exc, + extra={ + "session_id": session_id, + "execution_backend": execution_backend, + "reasoning_output_length": len(reasoning_output), + "error": str(exc), + }, + exc_info=True, + ) + raise BackendError( + message=f"Failed to augment messages with reasoning: {exc}", + code="hybrid_augmentation_failed", + details={ + "execution_backend": execution_backend, + "reasoning_output_length": len(reasoning_output), + }, + ) from exc + + try: + response = await self._execute_execution_phase( + request_data=request_data, + augmented_messages=augmented_messages, + execution_backend=execution_backend, + execution_model=execution_model, + identity=identity, + reasoning_output_length=len(reasoning_output), + uri_params=execution_params, + **kwargs, + ) + + if isinstance(response, StreamingResponseEnvelope): + if logger.isEnabledFor(TRACE_LEVEL): + logger.log( + TRACE_LEVEL, + "Filtering reasoning tags from streaming response", + extra={ + "session_id": session_id, + "execution_backend": execution_backend, + "execution_model": execution_model, + }, + ) + response = await self._filter_response_stream(response) + response = self._prepend_reasoning_chunk_to_stream( + response, + reasoning_output, + reasoning_backend, + reasoning_model, + formatted_reasoning=client_reasoning, + ) + elif isinstance(response, ResponseEnvelope): + logger.debug( + "Filtering reasoning tags from non-streaming response", + extra={ + "session_id": session_id, + "execution_backend": execution_backend, + "execution_model": execution_model, + }, + ) + filtered_content = self._filter_response_content(response.content) + response.content = self._prepend_reasoning_to_non_streaming_content( + filtered_content, + reasoning_output, + reasoning_backend, + reasoning_model, + formatted_reasoning=client_reasoning, + ) + if client_reasoning: + if response.metadata is None: + response.metadata = {} + response.metadata.setdefault("reasoning", client_reasoning) + response.metadata.setdefault("reasoning_format", "hybrid_injected") + response.metadata.setdefault("reasoning_backend", reasoning_backend) + response.metadata.setdefault("reasoning_model", reasoning_model) + + total_time = time.time() - start_time + execution_time = total_time - reasoning_time + + logger.info( + "Hybrid request completed: total=%.2fs (reasoning=%.2fs, execution=%.2fs)", + total_time, + reasoning_time, + execution_time, + extra={ + "session_id": session_id, + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "execution_backend": execution_backend, + "execution_model": execution_model, + "total_duration_seconds": total_time, + "reasoning_duration_seconds": reasoning_time, + "execution_duration_seconds": execution_time, + "reasoning_output_length": len(reasoning_output), + }, + ) + + return response + + except BackendError as exc: + logger.error( + "Execution phase failed: %s", + exc.message, + extra={ + "session_id": session_id, + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": len(reasoning_output), + "error_code": exc.code, + "error": exc.message, + }, + ) + raise BackendError( + message=f"Hybrid backend error (execution phase): {exc.message}", + code="hybrid_execution_failed", + details={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": len(reasoning_output), + "original_error": exc.code, + }, + ) from exc + + except AuthenticationError: + raise + + except Exception as exc: + logger.error( + "Hybrid backend failed with unexpected error: %s", + type(exc).__name__, + extra={ + "session_id": session_id, + "error_type": type(exc).__name__, + "error": str(exc), + }, + exc_info=True, + ) + raise BackendError( + message=f"Hybrid backend execution failed: {exc}", + code="hybrid_execution_unexpected_error", + details={ + "error_type": type(exc).__name__, + "session_id": session_id, + }, + ) from exc diff --git a/src/connectors/hybrid/constants.py b/src/connectors/hybrid/constants.py new file mode 100644 index 00000000..8d750548 --- /dev/null +++ b/src/connectors/hybrid/constants.py @@ -0,0 +1,4 @@ +"""Constants for the hybrid connector.""" + +REASONING_PHASE_TIMEOUT = 60.0 # seconds +EXECUTION_PHASE_TIMEOUT = 120.0 # seconds diff --git a/src/connectors/hybrid/logging_utils.py b/src/connectors/hybrid/logging_utils.py new file mode 100644 index 00000000..b694bce4 --- /dev/null +++ b/src/connectors/hybrid/logging_utils.py @@ -0,0 +1,20 @@ +"""Logging helpers for the hybrid connector.""" + +from __future__ import annotations + +import logging +import sys +from typing import Any, cast + +_BASE_LOGGER = logging.getLogger("src.connectors.hybrid") + + +def get_hybrid_logger() -> logging.Logger: + """Return the shared hybrid connector logger, honoring runtime patches.""" + + package = sys.modules.get("src.connectors.hybrid") + if package is not None: + candidate: Any = getattr(package, "logger", None) + if candidate is not None: + return cast(logging.Logger, candidate) + return _BASE_LOGGER diff --git a/src/connectors/hybrid/message_augmentation.py b/src/connectors/hybrid/message_augmentation.py new file mode 100644 index 00000000..a582663e --- /dev/null +++ b/src/connectors/hybrid/message_augmentation.py @@ -0,0 +1,105 @@ +"""Message augmentation helpers for the hybrid connector.""" + +from __future__ import annotations + +import copy +import logging + +logger = logging.getLogger(__name__) + + +class HybridMessageAugmentationMixin: + """Logic for injecting reasoning output into conversation history.""" + + def _format_reasoning_for_model(self, reasoning_output: str, backend: str) -> str: + """Format reasoning with model-specific tags.""" + + tagged, plain = self._prepare_reasoning_texts(reasoning_output, backend) + return tagged if plain else "" + + def _inject_as_system_message( + self, messages: list, reasoning_output: str, execution_backend: str + ) -> list: + """Inject reasoning as system message.""" + + messages_copy = copy.deepcopy(messages) + formatted_reasoning = self._format_reasoning_for_model( + reasoning_output, execution_backend + ) + if not formatted_reasoning: + return messages_copy + + system_content = ( + "Consider this reasoning when formulating your response:\n\n" + f"{formatted_reasoning}" + ) + + has_system_message = False + for idx, message in enumerate(messages_copy): + if isinstance(message, dict) and message.get("role") == "system": + messages_copy[idx][ + "content" + ] = f"{message['content']}\n\n{system_content}" + has_system_message = True + break + + if not has_system_message: + system_message = {"role": "system", "content": system_content} + messages_copy.insert(0, system_message) + + return messages_copy + + def _inject_to_user_message( + self, messages: list, reasoning_output: str, execution_backend: str + ) -> list: + """Inject reasoning as prefix to first user message.""" + + messages_copy = copy.deepcopy(messages) + formatted_reasoning = self._format_reasoning_for_model( + reasoning_output, execution_backend + ) + if not formatted_reasoning: + return messages_copy + + for idx, message in enumerate(messages_copy): + if isinstance(message, dict) and message.get("role") == "user": + original_content = message.get("content", "") + messages_copy[idx][ + "content" + ] = f"{formatted_reasoning}\n\n{original_content}" + break + + return messages_copy + + def _augment_messages( + self, messages: list, reasoning_output: str, execution_backend: str + ) -> list: + """Augment messages with reasoning using adaptive placement strategy.""" + + if not messages: + logger.warning("Empty message list provided for augmentation") + return messages + + if self._supports_system_messages(execution_backend): + logger.debug("Using system message injection for %s", execution_backend) + augmented_messages = self._inject_as_system_message( + messages, reasoning_output, execution_backend + ) + else: + logger.debug( + "Using user message prefix injection for %s", execution_backend + ) + augmented_messages = self._inject_to_user_message( + messages, reasoning_output, execution_backend + ) + + if self.config.backends.hybrid_backend_repeat_messages: + formatted_reasoning = self._format_reasoning_for_model( + reasoning_output, execution_backend + ) + if formatted_reasoning: + augmented_messages.append( + {"role": "assistant", "content": formatted_reasoning} + ) + + return augmented_messages diff --git a/src/connectors/hybrid/model_spec.py b/src/connectors/hybrid/model_spec.py new file mode 100644 index 00000000..208cac12 --- /dev/null +++ b/src/connectors/hybrid/model_spec.py @@ -0,0 +1,139 @@ +"""Model specification parsing for the hybrid connector.""" + +from __future__ import annotations + +import logging +from typing import Any + +logger = logging.getLogger(__name__) + + +class HybridModelSpecMixin: + """Provide hybrid model specification parsing helpers.""" + + def _parse_hybrid_model_spec( + self, model_spec: str + ) -> tuple[str, str, dict[str, Any], str, str, dict[str, Any]]: + """Parse hybrid model specification with optional URI parameters.""" + + from src.core.domain.model_utils import parse_model_with_params + + # Remove "hybrid:" prefix if present + if model_spec.startswith("hybrid:"): + model_spec = model_spec[7:] + + # Check for brackets + if not model_spec.startswith("[") or not model_spec.endswith("]"): + raise ValueError( + "Invalid hybrid model format. Expected: hybrid:[reasoning-backend:reasoning-model,execution-backend:execution-model]. " + "Example: hybrid:[minimax:MiniMax-M2,qwen-oauth:qwen3-coder-plus]" + ) + + # Remove brackets + model_spec = model_spec[1:-1] + + # Split by comma - need to be careful with commas in query strings + parts: list[str] = [] + current_part: list[str] = [] + + i = 0 + while i < len(model_spec): + char = model_spec[i] + + if char == ",": + current_str = "".join(current_part) + has_colon = ":" in current_str + current_str.find("?") + + if has_colon: + parts.append(current_str) + current_part = [] + i += 1 + continue + + current_part.append(char) + i += 1 + + if current_part: + parts.append("".join(current_part)) + + if len(parts) != 2: + raise ValueError( + f"Invalid hybrid model format. Expected exactly 2 models separated by comma, got {len(parts)}. " + "Expected: hybrid:[reasoning-backend:reasoning-model,execution-backend:execution-model]. " + "Example: hybrid:[minimax:MiniMax-M2,qwen-oauth:qwen3-coder-plus]" + ) + + reasoning_spec = parts[0].strip() + execution_spec = parts[1].strip() + + try: + reasoning_backend, reasoning_model, reasoning_params = ( + parse_model_with_params(reasoning_spec) + ) + except Exception as exc: # pragma: no cover - defensive logging + logger.warning( + "Failed to parse reasoning model specification '%s': %s. Attempting to continue with fallback parsing.", + reasoning_spec, + exc, + ) + raise ValueError( + f"Invalid reasoning model specification: '{reasoning_spec}'. " + f"Error: {exc}. " + "Expected format: backend:model or backend:model?params. " + "Example: minimax:MiniMax-M2?temperature=0.8" + ) from exc + + reasoning_backend = reasoning_backend.strip() + reasoning_model = reasoning_model.strip() + + if not reasoning_backend or not reasoning_model: + raise ValueError( + f"Incomplete reasoning model specification: '{reasoning_spec}'. Both backend and model must be non-empty. " + "Example: minimax:MiniMax-M2" + ) + + try: + execution_backend, execution_model, execution_params = ( + parse_model_with_params(execution_spec) + ) + except Exception as exc: # pragma: no cover - defensive logging + logger.warning( + "Failed to parse execution model specification '%s': %s. Attempting to continue with fallback parsing.", + execution_spec, + exc, + ) + raise ValueError( + f"Invalid execution model specification: '{execution_spec}'. " + f"Error: {exc}. " + "Expected format: backend:model or backend:model?params. " + "Example: qwen-oauth:qwen3-coder-plus?temperature=0.3" + ) from exc + + execution_backend = execution_backend.strip() + execution_model = execution_model.strip() + + if not execution_backend or not execution_model: + raise ValueError( + f"Incomplete execution model specification: '{execution_spec}'. Both backend and model must be non-empty. " + "Example: qwen-oauth:qwen3-coder-plus" + ) + + logger.debug( + "Parsed hybrid model spec: reasoning=%s:%s (params=%s), execution=%s:%s (params=%s)", + reasoning_backend, + reasoning_model, + reasoning_params, + execution_backend, + execution_model, + execution_params, + ) + + return ( + reasoning_backend, + reasoning_model, + reasoning_params, + execution_backend, + execution_model, + execution_params, + ) diff --git a/src/connectors/hybrid/parameters.py b/src/connectors/hybrid/parameters.py new file mode 100644 index 00000000..2f71f0c0 --- /dev/null +++ b/src/connectors/hybrid/parameters.py @@ -0,0 +1,150 @@ +"""Parameter utilities for the hybrid connector.""" + +from __future__ import annotations + +import contextlib +from dataclasses import asdict, is_dataclass +from typing import Any, cast + +from src.connectors.utils.model_capabilities import ( + get_execution_params, + get_reasoning_params, + supports_system_messages, +) +from src.core.config.app_config import AppConfig +from src.core.interfaces.configuration_interface import IAppIdentityConfig +from src.core.interfaces.model_bases import DomainModel, InternalDTO + +from .logging_utils import get_hybrid_logger + + +class HybridParameterMixin: + """Shared helpers for applying hybrid parameter presets.""" + + config: AppConfig + + def _apply_reasoning_params( + self, + request_data: DomainModel | InternalDTO | dict[str, Any], + backend_or_params: str | dict[str, Any], + enable_reasoning: bool | None = None, + ) -> DomainModel | InternalDTO | dict[str, Any]: + """Apply backend-specific reasoning parameters to the request.""" + + if isinstance(backend_or_params, str): + if enable_reasoning is None: + raise TypeError( + "enable_reasoning flag is required when backend name is provided" + ) + params = ( + get_reasoning_params(backend_or_params) + if enable_reasoning + else get_execution_params(backend_or_params) + ) + elif isinstance(backend_or_params, dict): + params = backend_or_params + else: # pragma: no cover - defensive guard + raise TypeError( + "backend_or_params must be a backend string or parameter dictionary" + ) + + return self._apply_parameter_overrides(request_data, params) + + def _apply_parameter_overrides( + self, + request_data: DomainModel | InternalDTO | dict[str, Any], + params: dict[str, Any], + ) -> DomainModel | InternalDTO | dict[str, Any]: + """Apply a parameter dictionary to the request data.""" + + if not params: + return request_data + + logger = get_hybrid_logger() + for key, value in params.items(): + logger.debug("Applying override %s=%s to request", key, value) + + if isinstance(request_data, DomainModel): + current_extra_body = getattr(request_data, "extra_body", None) + new_extra_body = dict(current_extra_body) if current_extra_body else {} + new_extra_body.update(params) + for drop_key in ("backend_type", "model"): + new_extra_body.pop(drop_key, None) + return request_data.model_copy( + update={ + "extra_body": new_extra_body if new_extra_body else None, + **params, + } + ) + + if isinstance(request_data, dict): + request_copy = dict(request_data) + current_extra_body = request_copy.get("extra_body") + new_extra_body = ( + dict(current_extra_body) if isinstance(current_extra_body, dict) else {} + ) + new_extra_body.update(params) + for drop_key in ("backend_type", "model"): + new_extra_body.pop(drop_key, None) + request_copy["extra_body"] = new_extra_body if new_extra_body else None + request_copy.update(params) + return request_copy + + if is_dataclass(request_data) and not isinstance(request_data, type): + request_dict = asdict(request_data) + current_extra_body = request_dict.get("extra_body") + new_extra_body = ( + dict(current_extra_body) if isinstance(current_extra_body, dict) else {} + ) + new_extra_body.update(params) + for drop_key in ("backend_type", "model"): + new_extra_body.pop(drop_key, None) + request_dict["extra_body"] = new_extra_body if new_extra_body else None + request_dict.update(params) + return request_dict + + logger = get_hybrid_logger() + logger.warning( + "Unsupported request_data type in _apply_reasoning_params: %s", + type(request_data).__name__, + ) + return request_data + + def _resolve_backend_identity( + self, + backend: str, + request_identity: IAppIdentityConfig | None, + backend_config: Any = None, + ) -> IAppIdentityConfig | None: + """Resolve identity configuration for backend calls.""" + + if backend_config is not None and getattr(backend_config, "identity", None): + return cast(IAppIdentityConfig, backend_config.identity) + + backend_identity = None + if hasattr(self.config, "backends"): + with contextlib.suppress(AttributeError): + backend_settings = getattr(self.config.backends, backend) + backend_identity = getattr(backend_settings, "identity", None) + if backend_identity is not None: + return cast(IAppIdentityConfig, backend_identity) + + if request_identity is not None: + return request_identity + + return getattr(self.config, "identity", None) + + def get_reasoning_params(self, backend: str = "openai") -> dict[str, Any]: + """Expose reasoning parameter presets for tests and diagnostics.""" + + return get_reasoning_params(backend) + + def get_execution_params(self, backend: str = "openai") -> dict[str, Any]: + """Expose execution parameter presets for tests and diagnostics.""" + + return get_execution_params(backend) + + def _supports_system_messages(self, backend: str) -> bool: + """Check if backend supports system messages.""" + + return supports_system_messages(backend) diff --git a/src/connectors/hybrid/phases.py b/src/connectors/hybrid/phases.py new file mode 100644 index 00000000..cb797f85 --- /dev/null +++ b/src/connectors/hybrid/phases.py @@ -0,0 +1,522 @@ +"""Execution phases for the hybrid connector.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +from src.connectors.hybrid.constants import ( + EXECUTION_PHASE_TIMEOUT, + REASONING_PHASE_TIMEOUT, +) +from src.connectors.hybrid.types import ReasoningPhaseResult +from src.connectors.utils.model_capabilities import ( + get_execution_params, + get_reasoning_params, +) +from src.core.common.exceptions import BackendError, ServiceResolutionError +from src.core.domain.responses import ResponseEnvelope, StreamingResponseEnvelope +from src.core.interfaces.configuration_interface import IAppIdentityConfig +from src.core.interfaces.model_bases import DomainModel, InternalDTO + +logger = logging.getLogger(__name__) + + +class HybridPhaseMixin: + """Manage the two-phase hybrid execution flow.""" + + _backend_registry: Any + + async def _execute_reasoning_phase( + self, + messages: list, + reasoning_backend: str, + reasoning_model: str, + request_data: DomainModel | InternalDTO | dict[str, Any], + identity: IAppIdentityConfig | None, + uri_params: dict[str, Any] | None = None, + ) -> ReasoningPhaseResult: + """Execute reasoning phase and capture output and metadata.""" + + logger.info( + "Starting reasoning phase with %s:%s", + reasoning_backend, + reasoning_model, + ) + + if self._backend_registry is None: + logger.error("Backend registry not initialized for reasoning phase") + raise BackendError( + message="Backend registry not initialized", + code="backend_registry_not_initialized", + details={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + }, + ) + + reasoning_preset_params = get_reasoning_params(reasoning_backend) + reasoning_request = self._apply_reasoning_params( + request_data, reasoning_preset_params + ) + + if uri_params: + try: + from src.core.services.uri_parameter_validator import ( + URIParameterValidator, + ) + + validator = URIParameterValidator() + normalized_params, validation_errors = validator.validate_and_normalize( + uri_params + ) + + if validation_errors: + logger.warning( + "URI parameter validation errors for reasoning phase (%s:%s): %s. Invalid parameters will be excluded.", + reasoning_backend, + reasoning_model, + ", ".join(validation_errors), + ) + + if normalized_params: + reasoning_request = self._apply_parameter_overrides( + reasoning_request, normalized_params + ) + except Exception as param_error: # pragma: no cover - defensive + logger.warning( + "Failed to apply URI parameters for reasoning phase (%s:%s): %s. Continuing without URI parameters.", + reasoning_backend, + reasoning_model, + param_error, + ) + + canonical_reasoning_request = self._prepare_backend_request( + reasoning_request, + target_model=f"{reasoning_backend}:{reasoning_model}", + stream=True, + messages=messages, + ) + + extra_body = getattr(canonical_reasoning_request, "extra_body", None) + if isinstance(extra_body, dict): + extra_body_keys = ", ".join(sorted(map(str, extra_body.keys()))) + elif extra_body is None: + extra_body_keys = "None" + else: + extra_body_keys = f"" + + logger.error( + "[HYBRID DEBUG] Prepared reasoning request: model=%s, extra_body_keys=%s", + canonical_reasoning_request.model, + extra_body_keys, + ) + + try: + from src.core.di.services import get_required_service + from src.core.domain.request_context import ( + RequestContext, + RequestCookies, + RequestHeaders, + ) + from src.core.services.backend_service import BackendService + + backend_service = get_required_service(BackendService) + + clean_context = RequestContext( + headers=RequestHeaders(), + cookies=RequestCookies(), + state=None, + app_state=None, + session_id=None, + ) + + try: + response = await asyncio.wait_for( + backend_service.call_completion( + canonical_reasoning_request, + stream=True, + allow_failover=False, + context=clean_context, + ), + timeout=REASONING_PHASE_TIMEOUT, + ) + except TypeError as exc: + if "context" not in str(exc): + raise + response = await asyncio.wait_for( + backend_service.call_completion( + canonical_reasoning_request, + stream=True, + allow_failover=False, + ), + timeout=REASONING_PHASE_TIMEOUT, + ) + + response_media_type = getattr(response, "media_type", None) + response_headers = getattr(response, "headers", None) + + if isinstance(response, StreamingResponseEnvelope) and response.content: + stream = response.content + else: + logger.error( + "Reasoning model did not return streaming response", + extra={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "response_type": type(response).__name__, + }, + ) + raise BackendError( + message="Reasoning model did not return streaming response", + code="reasoning_no_stream", + details={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "response_type": type(response).__name__, + }, + ) + + from src.connectors.utils.reasoning_stream_processor import ( + ReasoningStreamProcessor, + ) + + processor = ReasoningStreamProcessor() + reasoning_text, reasoning_complete, metadata = ( + await processor.capture_reasoning_stream(stream) + ) + tool_calls = metadata.get("tool_calls") or [] + raw_chunks = metadata.get("raw_chunks") or [] + + if hasattr(response, "cancel_callback") and response.cancel_callback: + try: + await response.cancel_callback() + logger.debug( + "Reasoning stream cancelled successfully", + extra={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + }, + ) + except Exception as exc: # pragma: no cover - defensive + logger.debug( + "Stream cancellation failed (non-fatal): %s", + exc, + extra={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "error": str(exc), + }, + ) + + logger.info( + "Reasoning phase complete: %s chars captured, method=%s, chunks=%s", + len(reasoning_text), + metadata.get("method"), + metadata.get("chunks_processed"), + ) + + return ReasoningPhaseResult( + text=reasoning_text, + complete=reasoning_complete, + tool_calls=tool_calls, + raw_chunks=raw_chunks, + media_type=response_media_type, + headers=response_headers, + ) + + except ServiceResolutionError as exc: + logger.error( + "Failed to resolve BackendService for reasoning phase", + extra={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "error": str(exc), + }, + exc_info=True, + ) + raise BackendError( + message=f"Failed to initialize reasoning backend: {exc}", + code="reasoning_backend_init_failed", + details={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + }, + ) from exc + + except asyncio.TimeoutError as exc: + logger.warning( + "Reasoning phase timeout after %ss, attempting to use partial reasoning output", + REASONING_PHASE_TIMEOUT, + extra={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "timeout_seconds": REASONING_PHASE_TIMEOUT, + }, + ) + raise BackendError( + message=f"Reasoning phase timeout after {REASONING_PHASE_TIMEOUT}s", + code="reasoning_timeout", + details={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "timeout_seconds": REASONING_PHASE_TIMEOUT, + }, + ) from exc + + except BackendError: + raise + + except Exception as exc: + logger.error( + "Reasoning phase failed with unexpected error: %s", + type(exc).__name__, + extra={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "error_type": type(exc).__name__, + "error": str(exc), + }, + exc_info=True, + ) + raise BackendError( + message=f"Reasoning phase failed: {exc}", + code="reasoning_phase_failed", + details={ + "phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "error_type": type(exc).__name__, + }, + ) from exc + + async def _execute_execution_phase( + self, + request_data: DomainModel | InternalDTO | dict[str, Any], + augmented_messages: list, + execution_backend: str, + execution_model: str, + identity: IAppIdentityConfig | None, + reasoning_output_length: int = 0, + uri_params: dict[str, Any] | None = None, + **kwargs: Any, + ) -> ResponseEnvelope | StreamingResponseEnvelope: + """Execute execution phase with augmented messages.""" + + logger.info( + "Starting execution phase with %s:%s", + execution_backend, + execution_model, + extra={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": reasoning_output_length, + }, + ) + + if self._backend_registry is None: + logger.error("Backend registry not initialized for execution phase") + raise BackendError( + message="Backend registry not initialized", + code="backend_registry_not_initialized", + details={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": reasoning_output_length, + }, + ) + + try: + from src.core.di.services import get_required_service + from src.core.services.backend_factory import BackendFactory + + backend_factory_instance = get_required_service(BackendFactory) + + execution_backend_config = None + if hasattr(self.config, "backends"): + import contextlib + + with contextlib.suppress(AttributeError): + execution_backend_config = getattr( + self.config.backends, execution_backend + ) + + execution_identity = self._resolve_backend_identity( + execution_backend, identity, execution_backend_config + ) + + execution_connector = await backend_factory_instance.ensure_backend( + execution_backend, self.config, execution_backend_config + ) + + except ValueError as exc: + logger.error( + "Execution backend '%s' not found in registry", + execution_backend, + extra={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": reasoning_output_length, + "error": str(exc), + }, + ) + raise BackendError( + message=f"Execution backend '{execution_backend}' not found: {exc}", + code="execution_backend_not_found", + details={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": reasoning_output_length, + }, + ) from exc + + except Exception as exc: + logger.error( + "Failed to initialize execution backend '%s'", + execution_backend, + extra={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": reasoning_output_length, + "error": str(exc), + }, + exc_info=True, + ) + raise BackendError( + message=f"Failed to initialize execution backend: {exc}", + code="execution_backend_init_failed", + details={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": reasoning_output_length, + }, + ) from exc + + execution_preset_params = get_execution_params(execution_backend) + execution_request = self._apply_reasoning_params( + request_data, execution_preset_params + ) + + if uri_params: + try: + from src.core.services.uri_parameter_validator import ( + URIParameterValidator, + ) + + validator = URIParameterValidator() + normalized_params, validation_errors = validator.validate_and_normalize( + uri_params + ) + + if validation_errors: + logger.warning( + "URI parameter validation errors for execution phase (%s:%s): %s. Invalid parameters will be excluded.", + execution_backend, + execution_model, + ", ".join(validation_errors), + ) + + if normalized_params: + execution_request = self._apply_parameter_overrides( + execution_request, normalized_params + ) + except Exception as param_error: # pragma: no cover - defensive + logger.warning( + "Failed to apply URI parameters for execution phase (%s:%s): %s. Continuing without URI parameters.", + execution_backend, + execution_model, + param_error, + ) + + try: + response = await asyncio.wait_for( + execution_connector.chat_completions( + request_data=execution_request, + processed_messages=augmented_messages, + effective_model=execution_model, + identity=execution_identity, + **kwargs, + ), + timeout=EXECUTION_PHASE_TIMEOUT, + ) + + logger.info( + "Execution phase complete", + extra={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + }, + ) + + return response + + except asyncio.TimeoutError as exc: + logger.error( + "Execution phase timeout after %ss", + EXECUTION_PHASE_TIMEOUT, + extra={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": reasoning_output_length, + "timeout_seconds": EXECUTION_PHASE_TIMEOUT, + }, + ) + raise BackendError( + message=f"Execution phase timeout after {EXECUTION_PHASE_TIMEOUT}s", + code="execution_timeout", + details={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": reasoning_output_length, + "timeout_seconds": EXECUTION_PHASE_TIMEOUT, + }, + ) from exc + + except BackendError: + raise + + except Exception as exc: + logger.error( + "Execution phase failed with unexpected error: %s", + type(exc).__name__, + extra={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": reasoning_output_length, + "error_type": type(exc).__name__, + "error": str(exc), + }, + exc_info=True, + ) + raise BackendError( + message=f"Execution phase failed: {exc}", + code="execution_phase_failed", + details={ + "phase": "execution", + "execution_backend": execution_backend, + "execution_model": execution_model, + "reasoning_output_length": reasoning_output_length, + "error_type": type(exc).__name__, + }, + ) from exc diff --git a/src/connectors/hybrid/reasoning_markup.py b/src/connectors/hybrid/reasoning_markup.py new file mode 100644 index 00000000..1ecd08e1 --- /dev/null +++ b/src/connectors/hybrid/reasoning_markup.py @@ -0,0 +1,106 @@ +"""Reasoning markup helpers for the hybrid connector.""" + +from __future__ import annotations + +import logging +import re + +from src.connectors.utils.model_capabilities import get_reasoning_tags + +logger = logging.getLogger(__name__) + + +class HybridReasoningMarkupMixin: + """Utilities for working with reasoning markup.""" + + _LEADING_REASONING_TAG = re.compile( + r"^\s*<\s*(?:think|thinking|reason|reasoning)\b[^>]*>\s*", re.IGNORECASE + ) + _TRAILING_REASONING_TAG = re.compile( + r"\s*<\s*/\s*(?:think|thinking|reason|reasoning)\b[^>]*>\s*$", + re.IGNORECASE, + ) + + @staticmethod + def _assemble_reasoning_markup( + opening_tag: str, closing_tag: str, body: str + ) -> str: + """Rebuild reasoning text with canonical tags.""" + + if not body: + return f"{opening_tag}{closing_tag}" + + if "\n" in body or body.startswith("<"): + return f"{opening_tag}\n{body}\n{closing_tag}" + + return f"{opening_tag}{body}{closing_tag}" + + def _truncate_after_reasoning_close(self, reasoning_output: str) -> str: + """Trim reasoning output so that only the thinking segment remains.""" + + closing_tags = ["", "", "", ""] + for tag in closing_tags: + index = reasoning_output.find(tag) + if index != -1: + return reasoning_output[: index + len(tag)] + return reasoning_output + + def _normalize_reasoning_markup( + self, reasoning_output: str, opening_tag: str, closing_tag: str + ) -> str: + """Normalize reasoning markup to use canonical tags and ensure closure.""" + + truncated = self._truncate_after_reasoning_close(reasoning_output) + stripped = truncated.strip() + if not stripped: + return stripped + + leading_match = self._LEADING_REASONING_TAG.match(stripped) + body_start = leading_match.end() if leading_match else 0 + body_section = stripped[body_start:] + + trailing_match = self._TRAILING_REASONING_TAG.search(body_section) + if trailing_match: + body_end = trailing_match.start() + body_section = body_section[:body_end] + + body = body_section.strip() + return self._assemble_reasoning_markup(opening_tag, closing_tag, body) + + def _apply_reasoning_tag_wrapping( + self, reasoning_output: str, opening_tag: str, closing_tag: str + ) -> str: + """Wrap or normalize reasoning output using backend-specific tags.""" + + return self._normalize_reasoning_markup( + reasoning_output, opening_tag, closing_tag + ) + + @staticmethod + def _extract_reasoning_inner_text(text: str) -> str: + """Strip XML-like tags and return inner text for reasoning payloads.""" + + if not text: + return "" + + return re.sub(r"<[^>]+>", "", text).strip() + + def _has_reasoning_content(self, formatted_reasoning: str) -> bool: + """Determine whether the formatted reasoning contains substantive text.""" + + return bool(self._extract_reasoning_inner_text(formatted_reasoning)) + + def _prepare_reasoning_texts( + self, reasoning_output: str, backend: str + ) -> tuple[str, str]: + """Return backend-tagged reasoning and plain text representations.""" + + if not reasoning_output: + return "", "" + + opening_tag, closing_tag = get_reasoning_tags(backend) + tagged_reasoning = self._apply_reasoning_tag_wrapping( + reasoning_output, opening_tag, closing_tag + ) + plain_text = self._extract_reasoning_inner_text(tagged_reasoning) + return tagged_reasoning, plain_text diff --git a/src/connectors/hybrid/request_preparation.py b/src/connectors/hybrid/request_preparation.py new file mode 100644 index 00000000..c069f39b --- /dev/null +++ b/src/connectors/hybrid/request_preparation.py @@ -0,0 +1,79 @@ +"""Request preparation helpers for the hybrid connector.""" + +from __future__ import annotations + +from dataclasses import asdict, is_dataclass +from typing import Any, cast + +from src.core.interfaces.model_bases import DomainModel, InternalDTO + + +class HybridRequestPreparationMixin: + """Normalize requests before delegating to backend services.""" + + translation_service: Any + + def _prepare_backend_request( + self, + request_data: DomainModel | InternalDTO | dict[str, Any], + target_model: str, + stream: bool, + messages: list | None = None, + ) -> CanonicalChatRequest: + """Normalize request for backend service calls.""" + + from src.core.domain.chat import CanonicalChatRequest, ChatRequest + + request_obj: Any = request_data + + if hasattr(request_obj, "model_copy"): + request_obj = request_obj.model_copy( + update={"model": target_model, "stream": stream} + ) + elif isinstance(request_obj, dict): + request_dict = dict(request_obj) + request_dict["model"] = target_model + request_dict["stream"] = stream + request_obj = self.translation_service.to_domain_request( + request_dict, "openai" + ) + elif is_dataclass(request_obj) and not isinstance(request_obj, type): + request_dict = asdict(request_obj) + request_dict["model"] = target_model + request_dict["stream"] = stream + request_obj = self.translation_service.to_domain_request( + request_dict, "openai" + ) + elif isinstance(request_obj, ChatRequest): + request_obj = request_obj.model_copy( + update={"model": target_model, "stream": stream} + ) + else: + raise TypeError( + "Unable to prepare backend request from type " + f"{type(request_obj).__name__}" + ) + + if not isinstance(request_obj, CanonicalChatRequest): + request_obj = self.translation_service.to_domain_request( + request_obj, "openai" + ) + + if messages is not None: + request_obj = request_obj.model_copy(update={"messages": messages}) + + if request_obj.extra_body and isinstance(request_obj.extra_body, dict): + keys_to_strip = {"session_id", "backend_type", "model"} + cleaned_extra_body = { + key: value + for key, value in request_obj.extra_body.items() + if key not in keys_to_strip + } + if len(cleaned_extra_body) != len(request_obj.extra_body): + request_obj = request_obj.model_copy( + update={ + "extra_body": cleaned_extra_body if cleaned_extra_body else None + } + ) + + return cast("CanonicalChatRequest", request_obj) diff --git a/src/connectors/hybrid/response_filtering.py b/src/connectors/hybrid/response_filtering.py new file mode 100644 index 00000000..88de2449 --- /dev/null +++ b/src/connectors/hybrid/response_filtering.py @@ -0,0 +1,387 @@ +"""Response filtering and augmentation utilities for the hybrid connector.""" + +from __future__ import annotations + +import json +import logging +import re +import time +import uuid +from copy import deepcopy +from typing import Any + +from src.core.domain.responses import ResponseEnvelope, StreamingResponseEnvelope +from src.core.interfaces.response_processor_interface import ProcessedResponse + +logger = logging.getLogger(__name__) + + +class HybridResponseFilteringMixin: + """Helpers for filtering reasoning output in responses.""" + + def _strip_reasoning_tags(self, content: str) -> str: + """Strip reasoning tags from content.""" + + reasoning_patterns = [ + r".*?", + r".*?", + r".*?", + r".*?", + ] + + cleaned_content = content + for pattern in reasoning_patterns: + cleaned_content = re.sub( + pattern, "", cleaned_content, flags=re.DOTALL | re.IGNORECASE + ) + + instruction_pattern = ( + r"Consider this reasoning when formulating your response:\s*" + ) + cleaned_content = re.sub( + instruction_pattern, "", cleaned_content, flags=re.IGNORECASE + ) + + return cleaned_content + + def _filter_response_content(self, content: Any) -> Any: + """Filter reasoning tags from response content.""" + + if isinstance(content, bytes): + try: + content_str = content.decode("utf-8") + except UnicodeDecodeError: # pragma: no cover - defensive + return content + elif isinstance(content, str): + content_str = content + elif isinstance(content, dict): + return self._filter_json_content(content) + elif isinstance(content, list): + return [self._filter_response_content(item) for item in content] + else: + return content + + if content_str.startswith("data: "): + data_part = content_str[6:].strip() + if data_part == "[DONE]": + return content + + try: + data = json.loads(data_part) + cleaned = self._filter_json_content(data) + filtered_data = json.dumps(cleaned, ensure_ascii=False) + if isinstance(content, bytes): + return f"data: {filtered_data}\n\n".encode() + return f"data: {filtered_data}\n\n" + except json.JSONDecodeError: + filtered_str = self._strip_reasoning_tags(content_str) + if isinstance(content, bytes): + return filtered_str.encode("utf-8") + return filtered_str + + filtered_str = self._strip_reasoning_tags(content_str) + if isinstance(content, bytes): + return filtered_str.encode("utf-8") + return filtered_str + + def _filter_json_content(self, data: Any) -> Any: + """Recursively remove reasoning content from JSON-like structures.""" + + if isinstance(data, dict): + filtered: dict[str, Any] = {} + for key, value in data.items(): + if key == "reasoning_content": + continue + filtered[key] = self._filter_json_content(value) + return filtered + + if isinstance(data, list): + return [self._filter_json_content(item) for item in data] + + if isinstance(data, str): + return self._strip_reasoning_tags(data) + + return data + + async def _filter_response_stream( + self, response: StreamingResponseEnvelope + ) -> StreamingResponseEnvelope: + """Filter reasoning tags from streaming response.""" + + async def filtered_stream(): + if response.content is None: + return + + async for chunk in response.content: + filtered_content = self._filter_response_content(chunk.content) + filtered_chunk = ProcessedResponse( + content=filtered_content, + usage=chunk.usage, + metadata=chunk.metadata, + ) + yield filtered_chunk + + return StreamingResponseEnvelope( + content=filtered_stream(), + media_type=response.media_type, + headers=response.headers, + cancel_callback=response.cancel_callback, + ) + + def _format_reasoning_for_client( + self, reasoning_output: str, reasoning_backend: str + ) -> str: + """Prepare reasoning text for client consumption with native tags.""" + + if not reasoning_output: + return "" + + _, plain_text = self._prepare_reasoning_texts( + reasoning_output, reasoning_backend + ) + return plain_text + + def _build_reasoning_stream_chunk( + self, + reasoning_output: str, + reasoning_backend: str, + reasoning_model: str, + formatted_reasoning: str | None = None, + ) -> ProcessedResponse | None: + """Create a processed response chunk that surfaces reasoning to clients.""" + + formatted = ( + formatted_reasoning.strip() + if formatted_reasoning + and "<" in formatted_reasoning + and ">" in formatted_reasoning + else "" + ) + if not formatted: + formatted, plain_reasoning = self._prepare_reasoning_texts( + reasoning_output, reasoning_backend + ) + else: + plain_reasoning = self._extract_reasoning_inner_text(formatted) + + if formatted_reasoning and formatted_reasoning.strip() and not formatted: + plain_reasoning = formatted_reasoning.strip() + formatted, _ = self._prepare_reasoning_texts( + reasoning_output, reasoning_backend + ) + if not formatted: + formatted = formatted_reasoning.strip() + + if not plain_reasoning: + return None + + delta_payload: dict[str, Any] = { + "role": "assistant", + "reasoning": formatted, + "reasoning_content": plain_reasoning, + "content": "", + } + + payload = { + "id": f"hybrid-reasoning-{uuid.uuid4().hex}", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": f"{reasoning_backend}:{reasoning_model}", + "choices": [ + { + "index": 0, + "delta": delta_payload, + "finish_reason": None, + } + ], + } + + sse_payload = f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" + + return ProcessedResponse( + content=sse_payload, + usage=None, + metadata={ + "hybrid_phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + }, + ) + + def _build_tool_call_only_response( + self, + tool_calls: list[dict[str, Any]], + request_dict: dict[str, Any], + reasoning_backend: str, + reasoning_model: str, + ) -> ResponseEnvelope | StreamingResponseEnvelope: + """Construct a response that forwards tool calls without execution.""" + + stream_requested = bool(request_dict.get("stream", False)) + created_ts = int(time.time()) + model_name = f"{reasoning_backend}:{reasoning_model}" + + if stream_requested: + payload = { + "id": f"hybrid-tool-call-{uuid.uuid4().hex}", + "object": "chat.completion.chunk", + "created": created_ts, + "model": model_name, + "choices": [ + { + "index": 0, + "delta": { + "role": "assistant", + "content": "", + "tool_calls": tool_calls, + }, + "finish_reason": "tool_calls", + } + ], + } + sse_payload = f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" + done_payload = "data: [DONE]\n\n" + + async def tool_call_stream(): + yield ProcessedResponse( + content=sse_payload, + metadata={ + "hybrid_phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "skipped_execution": True, + }, + ) + yield ProcessedResponse( + content=done_payload, + metadata={"is_done": True}, + ) + + return StreamingResponseEnvelope( + content=tool_call_stream(), + media_type="text/event-stream", + ) + + response_content = { + "id": f"hybrid-tool-call-{uuid.uuid4().hex}", + "object": "chat.completion", + "created": created_ts, + "model": model_name, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "", + "tool_calls": tool_calls, + }, + "finish_reason": "tool_calls", + } + ], + } + + return ResponseEnvelope( + content=response_content, + metadata={ + "hybrid_phase": "reasoning", + "reasoning_backend": reasoning_backend, + "reasoning_model": reasoning_model, + "skipped_execution": True, + }, + ) + + def _prepend_reasoning_chunk_to_stream( + self, + response: StreamingResponseEnvelope, + reasoning_output: str, + reasoning_backend: str, + reasoning_model: str, + formatted_reasoning: str | None = None, + ) -> StreamingResponseEnvelope: + """Inject the reasoning chunk ahead of the execution stream.""" + + reasoning_chunk = self._build_reasoning_stream_chunk( + reasoning_output, + reasoning_backend, + reasoning_model, + formatted_reasoning=formatted_reasoning, + ) + if reasoning_chunk is None: + return response + + original_stream = response.content + + async def combined_stream(): + yield reasoning_chunk + if original_stream is None: + return + async for chunk in original_stream: + yield chunk + + return StreamingResponseEnvelope( + content=combined_stream(), + media_type=response.media_type, + headers=response.headers, + cancel_callback=response.cancel_callback, + ) + + def _prepend_reasoning_to_non_streaming_content( + self, + content: Any, + reasoning_output: str, + reasoning_backend: str, + reasoning_model: str, + formatted_reasoning: str | None = None, + ) -> Any: + """Attach reasoning output to non-streaming responses.""" + + tagged, plain = self._prepare_reasoning_texts( + reasoning_output, reasoning_backend + ) + if formatted_reasoning: + candidate = formatted_reasoning.strip() + if "<" in candidate and ">" in candidate: + tagged = candidate + plain = self._extract_reasoning_inner_text(candidate) or plain + elif candidate: + plain = candidate + if not plain or not tagged: + return content + + if isinstance(content, bytes): + return content + + if isinstance(content, str): + return content + + if isinstance(content, dict): + updated = deepcopy(content) + choices = updated.get("choices") + if isinstance(choices, list): + for choice in choices: + if not isinstance(choice, dict): + continue + + message = choice.get("message") + if isinstance(message, dict): + if "role" not in message: + message["role"] = "assistant" + message["reasoning"] = tagged + message["reasoning_content"] = plain + continue + + delta = choice.get("delta") + if isinstance(delta, dict): + if "role" not in delta: + delta["role"] = "assistant" + delta["reasoning"] = tagged + delta["reasoning_content"] = plain + else: + metadata = updated.setdefault("metadata", {}) + if isinstance(metadata, dict): + metadata["reasoning"] = tagged + metadata["reasoning_content"] = plain + metadata.setdefault("reasoning_format", "hybrid_injected") + return updated + + return content diff --git a/src/connectors/hybrid/types.py b/src/connectors/hybrid/types.py new file mode 100644 index 00000000..911d5328 --- /dev/null +++ b/src/connectors/hybrid/types.py @@ -0,0 +1,25 @@ +"""Shared types for the hybrid connector implementation.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from src.core.interfaces.response_processor_interface import ProcessedResponse + + +@dataclass +class ReasoningPhaseResult: + """Container for reasoning phase outcome.""" + + text: str + complete: bool + tool_calls: list[dict[str, Any]] + raw_chunks: list[ProcessedResponse] + media_type: str | None + headers: dict[str, str] | None + + def has_tool_calls(self) -> bool: + """Check whether reasoning produced any tool calls.""" + + return bool(self.tool_calls) diff --git a/src/connectors/hybrid/utils.py b/src/connectors/hybrid/utils.py new file mode 100644 index 00000000..2d1d3f78 --- /dev/null +++ b/src/connectors/hybrid/utils.py @@ -0,0 +1,66 @@ +"""Utility helpers for the hybrid connector.""" + +from __future__ import annotations + +from typing import Any + + +class HybridConnectorUtilsMixin: + """Utility methods shared by the hybrid connector.""" + + @staticmethod + def _extract_message_role(message: Any) -> str | None: + """Best-effort extraction of a message role.""" + + role = getattr(message, "role", None) + if isinstance(role, str) and role: + return role + + if isinstance(message, dict): + role_value = message.get("role") + return role_value if isinstance(role_value, str) else None + + if hasattr(message, "model_dump") and callable(message.model_dump): + try: + dumped = message.model_dump() + role_value = dumped.get("role") + if isinstance(role_value, str): + return role_value + except Exception: # pragma: no cover - defensive + return None + + if hasattr(message, "get") and callable(message.get): + try: + role_value = message.get("role") + if isinstance(role_value, str): + return role_value + except Exception: # pragma: no cover - defensive + return None + + return None + + def _is_first_user_turn( + self, + processed_messages: list[Any] | None, + request_messages: list[Any] | None, + ) -> bool: + """Determine whether the current request represents the first user turn.""" + + messages_to_check: list[Any] = [] + if processed_messages: + messages_to_check = list(processed_messages) + elif request_messages: + messages_to_check = list(request_messages) + + if not messages_to_check: + return True + + for message in messages_to_check: + role = self._extract_message_role(message) + if not role: + continue + normalized_role = role.strip().lower() + if normalized_role in {"assistant", "tool", "function"}: + return False + + return True