|
1 | | -from __future__ import annotations |
2 | | - |
3 | | -import json |
4 | | -import logging |
5 | | -from typing import Any |
6 | | - |
7 | | -import src.core.services.metrics_service as metrics |
8 | | -from src.core.config.app_config import AppConfig |
9 | | -from src.core.interfaces.response_processor_interface import ( |
10 | | - IResponseMiddleware, |
11 | | -) |
12 | | -from src.core.services.json_repair_service import JsonRepairService |
13 | | - |
14 | | -logger = logging.getLogger(__name__) |
15 | | - |
16 | | - |
17 | | -class JsonRepairMiddleware(IResponseMiddleware): |
18 | | - """ |
19 | | - Middleware to detect and repair JSON in LLM responses. |
20 | | - """ |
21 | | - |
22 | | - def __init__( |
23 | | - self, config: AppConfig, json_repair_service: JsonRepairService |
24 | | - ) -> None: |
25 | | - self.config = config |
26 | | - self.json_repair_service = json_repair_service |
27 | | - |
28 | | - async def process( |
29 | | - self, |
30 | | - response: Any, |
31 | | - session_id: str, |
32 | | - context: dict[str, Any], |
33 | | - is_streaming: bool = False, |
34 | | - stop_event: Any = None, |
35 | | - ) -> Any: |
36 | | - """ |
37 | | - Processes the response to detect and repair JSON if enabled. |
38 | | - """ |
39 | | - if not self.config.session.json_repair_enabled: |
40 | | - return response |
41 | | - |
42 | | - # Skip for streaming chunks; handled by JsonRepairProcessor in pipeline |
43 | | - if context.get("response_type") == "stream": |
44 | | - return response |
45 | | - |
46 | | - if isinstance(response.content, str): |
47 | | - # Gate strict mode for non-streaming repairs based on intent |
48 | | - headers_raw = response.metadata.get("headers") |
49 | | - headers: dict[str, Any] = ( |
50 | | - headers_raw if isinstance(headers_raw, dict) else {} |
51 | | - ) |
52 | | - ct_raw = response.metadata.get("content_type") |
53 | | - content_type = ( |
54 | | - ct_raw |
55 | | - if isinstance(ct_raw, str) |
56 | | - else headers.get("Content-Type") or headers.get("content-type") |
57 | | - ) |
58 | | - is_json_ct = ( |
59 | | - isinstance(content_type, str) |
60 | | - and "application/json" in content_type.lower() |
61 | | - ) |
62 | | - expected_json = bool(context.get("expected_json")) |
63 | | - has_schema = self.config.session.json_repair_schema is not None |
64 | | - strict_effective = ( |
65 | | - bool(self.config.session.json_repair_strict_mode) |
66 | | - or is_json_ct |
67 | | - or expected_json |
68 | | - or has_schema |
69 | | - ) |
70 | | - |
71 | | - try: |
72 | | - repaired_json = self.json_repair_service.repair_and_validate_json( |
73 | | - response.content, |
74 | | - schema=self.config.session.json_repair_schema, |
75 | | - strict=strict_effective, |
76 | | - ) |
77 | | - if repaired_json is not None: |
78 | | - metrics.inc( |
79 | | - "json_repair.non_streaming.strict_success" |
80 | | - if strict_effective |
81 | | - else "json_repair.non_streaming.best_effort_success" |
82 | | - ) |
83 | | - else: |
84 | | - metrics.inc( |
85 | | - "json_repair.non_streaming.strict_fail" |
86 | | - if strict_effective |
87 | | - else "json_repair.non_streaming.best_effort_fail" |
88 | | - ) |
89 | | - except Exception: |
90 | | - metrics.inc( |
91 | | - "json_repair.non_streaming.strict_fail" |
92 | | - if strict_effective |
93 | | - else "json_repair.non_streaming.best_effort_fail" |
94 | | - ) |
95 | | - raise |
96 | | - if repaired_json is not None: |
97 | | - logger.info(f"JSON detected and repaired for session {session_id}") |
98 | | - response.content = json.dumps(repaired_json) |
99 | | - response.metadata["repaired"] = True |
100 | | - |
101 | | - return response |
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import json |
| 4 | +import logging |
| 5 | +from typing import Any |
| 6 | + |
| 7 | +import src.core.services.metrics_service as metrics |
| 8 | +from src.core.config.app_config import AppConfig |
| 9 | +from src.core.interfaces.response_processor_interface import ( |
| 10 | + IResponseMiddleware, |
| 11 | +) |
| 12 | +from src.core.services.json_repair_service import ( |
| 13 | + JsonRepairResult, |
| 14 | + JsonRepairService, |
| 15 | +) |
| 16 | + |
| 17 | +logger = logging.getLogger(__name__) |
| 18 | + |
| 19 | + |
| 20 | +class JsonRepairMiddleware(IResponseMiddleware): |
| 21 | + """ |
| 22 | + Middleware to detect and repair JSON in LLM responses. |
| 23 | + """ |
| 24 | + |
| 25 | + def __init__( |
| 26 | + self, config: AppConfig, json_repair_service: JsonRepairService |
| 27 | + ) -> None: |
| 28 | + self.config = config |
| 29 | + self.json_repair_service = json_repair_service |
| 30 | + |
| 31 | + async def process( |
| 32 | + self, |
| 33 | + response: Any, |
| 34 | + session_id: str, |
| 35 | + context: dict[str, Any], |
| 36 | + is_streaming: bool = False, |
| 37 | + stop_event: Any = None, |
| 38 | + ) -> Any: |
| 39 | + """ |
| 40 | + Processes the response to detect and repair JSON if enabled. |
| 41 | + """ |
| 42 | + if not self.config.session.json_repair_enabled: |
| 43 | + return response |
| 44 | + |
| 45 | + # Skip for streaming chunks; handled by JsonRepairProcessor in pipeline |
| 46 | + if context.get("response_type") == "stream": |
| 47 | + return response |
| 48 | + |
| 49 | + if isinstance(response.content, str): |
| 50 | + # Gate strict mode for non-streaming repairs based on intent |
| 51 | + headers_raw = response.metadata.get("headers") |
| 52 | + headers: dict[str, Any] = ( |
| 53 | + headers_raw if isinstance(headers_raw, dict) else {} |
| 54 | + ) |
| 55 | + ct_raw = response.metadata.get("content_type") |
| 56 | + content_type = ( |
| 57 | + ct_raw |
| 58 | + if isinstance(ct_raw, str) |
| 59 | + else headers.get("Content-Type") or headers.get("content-type") |
| 60 | + ) |
| 61 | + is_json_ct = ( |
| 62 | + isinstance(content_type, str) |
| 63 | + and "application/json" in content_type.lower() |
| 64 | + ) |
| 65 | + expected_json = bool(context.get("expected_json")) |
| 66 | + has_schema = self.config.session.json_repair_schema is not None |
| 67 | + strict_effective = ( |
| 68 | + bool(self.config.session.json_repair_strict_mode) |
| 69 | + or is_json_ct |
| 70 | + or expected_json |
| 71 | + or has_schema |
| 72 | + ) |
| 73 | + |
| 74 | + try: |
| 75 | + repair_result: JsonRepairResult = ( |
| 76 | + self.json_repair_service.repair_and_validate_json( |
| 77 | + response.content, |
| 78 | + schema=self.config.session.json_repair_schema, |
| 79 | + strict=strict_effective, |
| 80 | + ) |
| 81 | + ) |
| 82 | + metric_suffix = ( |
| 83 | + "strict_success" |
| 84 | + if strict_effective and repair_result.success |
| 85 | + else ( |
| 86 | + "best_effort_success" |
| 87 | + if repair_result.success |
| 88 | + else ("strict_fail" if strict_effective else "best_effort_fail") |
| 89 | + ) |
| 90 | + ) |
| 91 | + metrics.inc(f"json_repair.non_streaming.{metric_suffix}") |
| 92 | + except Exception: |
| 93 | + metrics.inc( |
| 94 | + "json_repair.non_streaming.strict_fail" |
| 95 | + if strict_effective |
| 96 | + else "json_repair.non_streaming.best_effort_fail" |
| 97 | + ) |
| 98 | + raise |
| 99 | + if repair_result.success: |
| 100 | + if logger.isEnabledFor(logging.INFO): |
| 101 | + logger.info(f"JSON detected and repaired for session {session_id}") |
| 102 | + response.content = json.dumps(repair_result.content) |
| 103 | + response.metadata["repaired"] = True |
| 104 | + |
| 105 | + return response |
0 commit comments