44This module provides the implementation of the backend request manager interface.
55"""
66
7- from __future__ import annotations
8-
9- import logging
7+ from __future__ import annotations
8+
9+ import logging
10+ import time
1011from collections .abc import AsyncIterator , Iterable
1112from typing import Any , cast
1213
@@ -356,7 +357,11 @@ async def _process_streaming_response(
356357 "Maximum empty stream recovery attempts reached for session %s" ,
357358 session_id ,
358359 )
359- return stream_envelope
360+ return await self ._build_fallback_stream (
361+ stream_envelope = stream_envelope ,
362+ session_id = session_id ,
363+ reason = "Empty streaming response after maximum retries" ,
364+ )
360365
361366 original_stream = stream_envelope .content
362367 if original_stream is None :
@@ -499,7 +504,11 @@ async def _retry_stream_with_recovery(
499504 reason ,
500505 session_id ,
501506 )
502- return stream_envelope
507+ return await self ._build_fallback_stream (
508+ stream_envelope = stream_envelope ,
509+ session_id = session_id ,
510+ reason = reason ,
511+ )
503512
504513 logger .info ("%s" , reason )
505514 recovery_prompt = self ._STREAM_RECOVERY_PROMPT
@@ -621,11 +630,11 @@ def _extract_text_from_chunk(chunk: ProcessedResponse | bytes) -> str:
621630 return ""
622631
623632 # Handle case where chunk is a ProcessedResponse object
624- if isinstance (chunk , ProcessedResponse ):
625- data = chunk .content
626- if isinstance (data , str ):
627- return data
628- if isinstance (data , dict ):
633+ if isinstance (chunk , ProcessedResponse ):
634+ data = chunk .content
635+ if isinstance (data , str ):
636+ return data
637+ if isinstance (data , dict ):
629638 choices = data .get ("choices" )
630639 if isinstance (choices , list ) and choices :
631640 choice = choices [0 ]
@@ -646,9 +655,59 @@ def _extract_text_from_chunk(chunk: ProcessedResponse | bytes) -> str:
646655 fragments_processed .append (text_part )
647656 if fragments_processed :
648657 return "" .join (fragments_processed )
649- message = choice .get ("message" )
650- if isinstance (message , dict ):
651- msg_content = message .get ("content" )
652- if isinstance (msg_content , str ):
653- return msg_content
654- return ""
658+ message = choice .get ("message" )
659+ if isinstance (message , dict ):
660+ msg_content = message .get ("content" )
661+ if isinstance (msg_content , str ):
662+ return msg_content
663+ return ""
664+
665+ async def _build_fallback_stream (
666+ self ,
667+ stream_envelope : StreamingResponseEnvelope ,
668+ session_id : str ,
669+ reason : str ,
670+ ) -> StreamingResponseEnvelope :
671+ """Generate a synthetic assistant response when recovery fails."""
672+ logger .warning (
673+ "Returning fallback assistant message for session %s: %s" ,
674+ session_id ,
675+ reason ,
676+ )
677+
678+ payload = {
679+ "id" : "proxy-empty-response-retry" ,
680+ "object" : "chat.completion.chunk" ,
681+ "created" : int (time .time ()),
682+ "model" : "proxy-empty-response" ,
683+ "choices" : [
684+ {
685+ "index" : 0 ,
686+ "delta" : {
687+ "role" : "assistant" ,
688+ "content" : (
689+ "Proxy notice: the upstream model returned no content "
690+ "after multiple attempts. Please retry or adjust your request."
691+ ),
692+ },
693+ "finish_reason" : "stop" ,
694+ }
695+ ],
696+ }
697+
698+ async def fallback_stream () -> AsyncIterator [ProcessedResponse ]:
699+ yield ProcessedResponse (
700+ content = payload ,
701+ metadata = {
702+ "proxy_generated" : True ,
703+ "empty_response_recovery_failed" : True ,
704+ "recovery_reason" : reason ,
705+ },
706+ )
707+
708+ return StreamingResponseEnvelope (
709+ content = fallback_stream (),
710+ media_type = stream_envelope .media_type ,
711+ headers = stream_envelope .headers ,
712+ cancel_callback = stream_envelope .cancel_callback ,
713+ )
0 commit comments