Skip to content

Commit 5c8db68

Browse files
author
Mateusz
committed
Handle empty Gemini streams by raising backend error
1 parent 151eb09 commit 5c8db68

File tree

1 file changed

+63
-98
lines changed

1 file changed

+63
-98
lines changed

src/core/services/backend_request_manager_service.py

Lines changed: 63 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@
77
from __future__ import annotations
88

99
import logging
10-
import time
11-
from collections.abc import AsyncIterator, Iterable
12-
from typing import Any, cast
13-
14-
from src.core.domain.chat import ChatMessage, ChatRequest
15-
from src.core.domain.processed_result import ProcessedResult
16-
from src.core.domain.request_context import RequestContext
17-
from src.core.domain.responses import ResponseEnvelope, StreamingResponseEnvelope
18-
from src.core.interfaces.backend_processor_interface import IBackendProcessor
10+
from collections.abc import AsyncIterator, Iterable
11+
from typing import Any, cast
12+
13+
from src.core.common.exceptions import BackendError
14+
from src.core.domain.chat import ChatMessage, ChatRequest
15+
from src.core.domain.processed_result import ProcessedResult
16+
from src.core.domain.request_context import RequestContext
17+
from src.core.domain.responses import ResponseEnvelope, StreamingResponseEnvelope
18+
from src.core.interfaces.backend_processor_interface import IBackendProcessor
1919
from src.core.interfaces.backend_request_manager_interface import IBackendRequestManager
2020
from src.core.interfaces.loop_detector_interface import ILoopDetector
2121
from src.core.interfaces.response_processor_interface import (
@@ -357,10 +357,9 @@ async def _process_streaming_response(
357357
"Maximum empty stream recovery attempts reached for session %s",
358358
session_id,
359359
)
360-
return await self._build_fallback_stream(
361-
stream_envelope=stream_envelope,
360+
self._raise_empty_stream_error(
362361
session_id=session_id,
363-
reason="Empty streaming response after maximum retries",
362+
reason="empty_stream_after_retries",
364363
)
365364

366365
original_stream = stream_envelope.content
@@ -504,10 +503,9 @@ async def _retry_stream_with_recovery(
504503
reason,
505504
session_id,
506505
)
507-
return await self._build_fallback_stream(
508-
stream_envelope=stream_envelope,
506+
self._raise_empty_stream_error(
509507
session_id=session_id,
510-
reason=reason,
508+
reason="empty_stream_retry_failure",
511509
)
512510

513511
logger.info("%s", reason)
@@ -593,41 +591,49 @@ def _extract_text_from_chunk(chunk: ProcessedResponse | bytes) -> str:
593591
import json
594592

595593
# Handle case where chunk is raw bytes (from streaming)
596-
if isinstance(chunk, bytes):
597-
try:
598-
decoded = chunk.decode("utf-8")
599-
# Try to parse as JSON to extract content
600-
data = json.loads(decoded)
601-
if isinstance(data, dict):
602-
choices = data.get("choices")
603-
if isinstance(choices, list) and choices:
604-
choice = choices[0]
605-
if isinstance(choice, dict):
606-
delta = choice.get("delta")
607-
if isinstance(delta, dict):
608-
content = delta.get("content")
609-
if isinstance(content, str):
610-
return content
611-
if isinstance(content, list):
612-
fragments_bytes: list[str] = []
613-
for part in content:
614-
if isinstance(part, str):
615-
fragments_bytes.append(part)
616-
elif isinstance(part, dict):
617-
text_part = part.get("text")
618-
if isinstance(text_part, str):
619-
fragments_bytes.append(text_part)
620-
if fragments_bytes:
621-
return "".join(fragments_bytes)
622-
message = choice.get("message")
623-
if isinstance(message, dict):
624-
msg_content = message.get("content")
625-
if isinstance(msg_content, str):
626-
return msg_content
627-
except (UnicodeDecodeError, json.JSONDecodeError):
628-
# If decoding or parsing fails, return empty string
629-
return ""
630-
return ""
594+
if isinstance(chunk, bytes):
595+
try:
596+
decoded = chunk.decode("utf-8")
597+
except UnicodeDecodeError:
598+
return ""
599+
600+
for line in decoded.splitlines():
601+
if not line.startswith("data: "):
602+
continue
603+
payload = line[6:].strip()
604+
if payload == "[DONE]":
605+
continue
606+
try:
607+
data = json.loads(payload)
608+
except json.JSONDecodeError:
609+
continue
610+
if isinstance(data, dict):
611+
choices = data.get("choices")
612+
if isinstance(choices, list) and choices:
613+
choice = choices[0]
614+
if isinstance(choice, dict):
615+
delta = choice.get("delta") or {}
616+
if isinstance(delta, dict):
617+
content = delta.get("content")
618+
if isinstance(content, str):
619+
return content
620+
if isinstance(content, list):
621+
fragments_bytes: list[str] = []
622+
for part in content:
623+
if isinstance(part, str):
624+
fragments_bytes.append(part)
625+
elif isinstance(part, dict):
626+
text_part = part.get("text")
627+
if isinstance(text_part, str):
628+
fragments_bytes.append(text_part)
629+
if fragments_bytes:
630+
return "".join(fragments_bytes)
631+
message = choice.get("message")
632+
if isinstance(message, dict):
633+
msg_content = message.get("content")
634+
if isinstance(msg_content, str):
635+
return msg_content
636+
return ""
631637

632638
# Handle case where chunk is a ProcessedResponse object
633639
if isinstance(chunk, ProcessedResponse):
@@ -662,52 +668,11 @@ def _extract_text_from_chunk(chunk: ProcessedResponse | bytes) -> str:
662668
return msg_content
663669
return ""
664670

665-
async def _build_fallback_stream(
666-
self,
667-
stream_envelope: StreamingResponseEnvelope,
668-
session_id: str,
669-
reason: str,
670-
) -> StreamingResponseEnvelope:
671-
"""Generate a synthetic assistant response when recovery fails."""
672-
logger.warning(
673-
"Returning fallback assistant message for session %s: %s",
674-
session_id,
675-
reason,
676-
)
677-
678-
payload = {
679-
"id": "proxy-empty-response-retry",
680-
"object": "chat.completion.chunk",
681-
"created": int(time.time()),
682-
"model": "proxy-empty-response",
683-
"choices": [
684-
{
685-
"index": 0,
686-
"delta": {
687-
"role": "assistant",
688-
"content": (
689-
"Proxy notice: the upstream model returned no content "
690-
"after multiple attempts. Please retry or adjust your request."
691-
),
692-
},
693-
"finish_reason": "stop",
694-
}
695-
],
696-
}
697-
698-
async def fallback_stream() -> AsyncIterator[ProcessedResponse]:
699-
yield ProcessedResponse(
700-
content=payload,
701-
metadata={
702-
"proxy_generated": True,
703-
"empty_response_recovery_failed": True,
704-
"recovery_reason": reason,
705-
},
706-
)
707-
708-
return StreamingResponseEnvelope(
709-
content=fallback_stream(),
710-
media_type=stream_envelope.media_type,
711-
headers=stream_envelope.headers,
712-
cancel_callback=stream_envelope.cancel_callback,
671+
def _raise_empty_stream_error(self, session_id: str, reason: str) -> None:
672+
"""Raise a backend error when no content is produced after retries."""
673+
raise BackendError(
674+
message="Upstream model returned no content after retries",
675+
backend_name="gemini-oauth-plan",
676+
code=reason,
677+
details={"session_id": session_id},
713678
)

0 commit comments

Comments
 (0)