11import base64
22import json
33import logging
4+ import os
45import re
56import time
67from abc import ABC
1314import tiktoken
1415from botocore .config import Config
1516from fastapi import HTTPException
17+ from langfuse import observe
1618from starlette .concurrency import run_in_threadpool
1719
1820from api .models .base import BaseChatModel , BaseEmbeddingsModel
@@ -230,6 +232,7 @@ def validate(self, chat_request: ChatRequest):
230232 detail = error ,
231233 )
232234
235+ @observe (as_type = "generation" , name = "Bedrock Converse" )
233236 async def _invoke_bedrock (self , chat_request : ChatRequest , stream = False ):
234237 """Common logic for invoke bedrock models"""
235238 if DEBUG :
@@ -240,6 +243,30 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
240243 if DEBUG :
241244 logger .info ("Bedrock request: " + json .dumps (str (args )))
242245
246+ # Import langfuse to update current generation (must be done inside the decorated function)
247+ from langfuse import langfuse_context
248+
249+ # Extract model metadata for Langfuse
250+ args_clone = args .copy ()
251+ messages = args_clone .get ('messages' , [])
252+ model_id = args_clone .get ('modelId' , 'unknown' )
253+ model_parameters = {
254+ ** args_clone .get ('inferenceConfig' , {}),
255+ ** args_clone .get ('additionalModelRequestFields' , {})
256+ }
257+
258+ # Update Langfuse generation with input metadata
259+ langfuse_context .update_current_observation (
260+ input = messages ,
261+ model = model_id ,
262+ model_parameters = model_parameters ,
263+ metadata = {
264+ 'system' : args_clone .get ('system' , []),
265+ 'toolConfig' : args_clone .get ('toolConfig' , {}),
266+ 'stream' : stream
267+ }
268+ )
269+
243270 try :
244271 if stream :
245272 # Run the blocking boto3 call in a thread pool
@@ -249,14 +276,38 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
249276 else :
250277 # Run the blocking boto3 call in a thread pool
251278 response = await run_in_threadpool (bedrock_runtime .converse , ** args )
279+
280+ # For non-streaming, extract response metadata immediately
281+ if response and not stream :
282+ output_message = response .get ("output" , {}).get ("message" , {})
283+ usage = response .get ("usage" , {})
284+
285+ langfuse_context .update_current_observation (
286+ output = output_message ,
287+ usage = {
288+ "input" : usage .get ("inputTokens" , 0 ),
289+ "output" : usage .get ("outputTokens" , 0 ),
290+ "total" : usage .get ("totalTokens" , 0 )
291+ },
292+ metadata = {
293+ "stopReason" : response .get ("stopReason" ),
294+ "ResponseMetadata" : response .get ("ResponseMetadata" , {})
295+ }
296+ )
252297 except bedrock_runtime .exceptions .ValidationException as e :
253- logger .error ("Bedrock validation error for model %s: %s" , chat_request .model , str (e ))
298+ error_message = f"Bedrock validation error for model { chat_request .model } : { str (e )} "
299+ logger .error (error_message )
300+ langfuse_context .update_current_observation (level = "ERROR" , status_message = error_message )
254301 raise HTTPException (status_code = 400 , detail = str (e ))
255302 except bedrock_runtime .exceptions .ThrottlingException as e :
256- logger .warning ("Bedrock throttling for model %s: %s" , chat_request .model , str (e ))
303+ error_message = f"Bedrock throttling for model { chat_request .model } : { str (e )} "
304+ logger .warning (error_message )
305+ langfuse_context .update_current_observation (level = "WARNING" , status_message = error_message )
257306 raise HTTPException (status_code = 429 , detail = str (e ))
258307 except Exception as e :
259- logger .error ("Bedrock invocation failed for model %s: %s" , chat_request .model , str (e ))
308+ error_message = f"Bedrock invocation failed for model { chat_request .model } : { str (e )} "
309+ logger .error (error_message )
310+ langfuse_context .update_current_observation (level = "ERROR" , status_message = error_message )
260311 raise HTTPException (status_code = 500 , detail = str (e ))
261312 return response
262313
0 commit comments