@@ -263,6 +263,8 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
263263 'stream' : stream
264264 }
265265 )
266+ if DEBUG :
267+ logger .info (f"Langfuse: Updated observation with input - model={ model_id } , stream={ stream } , messages_count={ len (messages )} " )
266268
267269 try :
268270 if stream :
@@ -309,20 +311,32 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
309311 },
310312 metadata = metadata
311313 )
314+ if DEBUG :
315+ logger .info (f"Langfuse: Updated observation with output - "
316+ f"input_tokens={ usage .get ('inputTokens' , 0 )} , "
317+ f"output_tokens={ usage .get ('outputTokens' , 0 )} , "
318+ f"has_reasoning={ has_reasoning } , "
319+ f"stop_reason={ response .get ('stopReason' )} " )
312320 except bedrock_runtime .exceptions .ValidationException as e :
313321 error_message = f"Bedrock validation error for model { chat_request .model } : { str (e )} "
314322 logger .error (error_message )
315323 langfuse_context .update_current_observation (level = "ERROR" , status_message = error_message )
324+ if DEBUG :
325+ logger .info (f"Langfuse: Updated observation with ValidationException error" )
316326 raise HTTPException (status_code = 400 , detail = str (e ))
317327 except bedrock_runtime .exceptions .ThrottlingException as e :
318328 error_message = f"Bedrock throttling for model { chat_request .model } : { str (e )} "
319329 logger .warning (error_message )
320330 langfuse_context .update_current_observation (level = "WARNING" , status_message = error_message )
331+ if DEBUG :
332+ logger .info (f"Langfuse: Updated observation with ThrottlingException warning" )
321333 raise HTTPException (status_code = 429 , detail = str (e ))
322334 except Exception as e :
323335 error_message = f"Bedrock invocation failed for model { chat_request .model } : { str (e )} "
324336 logger .error (error_message )
325337 langfuse_context .update_current_observation (level = "ERROR" , status_message = error_message )
338+ if DEBUG :
339+ logger .info (f"Langfuse: Updated observation with generic Exception error" )
326340 raise HTTPException (status_code = 500 , detail = str (e ))
327341 return response
328342
@@ -358,6 +372,8 @@ async def _async_iterate(self, stream):
358372 async def chat_stream (self , chat_request : ChatRequest ) -> AsyncIterable [bytes ]:
359373 """Default implementation for Chat Stream API"""
360374 try :
375+ if DEBUG :
376+ logger .info (f"Langfuse: Starting streaming request for model={ chat_request .model } " )
361377 response = await self ._invoke_bedrock (chat_request , stream = True )
362378 message_id = self .generate_message_id ()
363379 stream = response .get ("stream" )
@@ -432,6 +448,16 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
432448 update_params ["metadata" ] = metadata
433449
434450 langfuse_context .update_current_observation (** update_params )
451+
452+ if DEBUG :
453+ output_length = len (accumulated_output )
454+ logger .info (f"Langfuse: Updated observation with streaming output - "
455+ f"chunks_count={ output_length } , "
456+ f"output_chars={ len (final_output ) if accumulated_output else 0 } , "
457+ f"input_tokens={ final_usage .prompt_tokens if final_usage else 'N/A' } , "
458+ f"output_tokens={ final_usage .completion_tokens if final_usage else 'N/A' } , "
459+ f"has_reasoning={ has_reasoning } , "
460+ f"finish_reason={ finish_reason } " )
435461
436462 # return an [DONE] message at the end.
437463 yield self .stream_response_to_bytes ()
@@ -443,6 +469,8 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
443469 level = "ERROR" ,
444470 status_message = f"Stream error: { str (e )} "
445471 )
472+ if DEBUG :
473+ logger .info (f"Langfuse: Updated observation with streaming error - error={ str (e )[:100 ]} " )
446474 error_event = Error (error = ErrorMessage (message = str (e )))
447475 yield self .stream_response_to_bytes (error_event )
448476
0 commit comments