1414import tiktoken
1515from botocore .config import Config
1616from fastapi import HTTPException
17- from langfuse import observe , get_client
17+ from langfuse . decorators import langfuse_context , observe
1818from starlette .concurrency import run_in_threadpool
1919
2020from api .models .base import BaseChatModel , BaseEmbeddingsModel
5454
5555logger = logging .getLogger (__name__ )
5656
57- # Initialize Langfuse client
58- _langfuse_client = None
59-
60- def _get_langfuse_client ():
61- """Get or create the Langfuse client singleton."""
62- global _langfuse_client
63- if _langfuse_client is None :
64- try :
65- _langfuse_client = get_client ()
66- except Exception as e :
67- logger .warning (f"Failed to initialize Langfuse client: { e } " )
68- _langfuse_client = None
69- return _langfuse_client
70-
7157config = Config (
7258 connect_timeout = 60 , # Connection timeout: 60 seconds
7359 read_timeout = 900 , # Read timeout: 15 minutes (suitable for long streaming responses)
@@ -267,23 +253,18 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
267253 }
268254
269255 # Update Langfuse generation with input metadata
270- langfuse_client = _get_langfuse_client ()
271- if langfuse_client :
272- try :
273- langfuse_client .update_current_generation (
274- input = messages ,
275- model = model_id ,
276- model_parameters = model_parameters ,
277- metadata = {
278- 'system' : args_clone .get ('system' , []),
279- 'toolConfig' : args_clone .get ('toolConfig' , {}),
280- 'stream' : stream
281- }
282- )
283- if DEBUG :
284- logger .info (f"Langfuse: Updated observation with input - model={ model_id } , stream={ stream } , messages_count={ len (messages )} " )
285- except Exception as e :
286- logger .warning (f"Failed to update Langfuse: { e } " )
256+ langfuse_context .update_current_observation (
257+ input = messages ,
258+ model = model_id ,
259+ model_parameters = model_parameters ,
260+ metadata = {
261+ 'system' : args_clone .get ('system' , []),
262+ 'toolConfig' : args_clone .get ('toolConfig' , {}),
263+ 'stream' : stream
264+ }
265+ )
266+ if DEBUG :
267+ logger .info (f"Langfuse: Updated observation with input - model={ model_id } , stream={ stream } , messages_count={ len (messages )} " )
287268
288269 try :
289270 if stream :
@@ -321,61 +302,41 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
321302 metadata ["reasoning_content" ] = reasoning_text
322303 metadata ["reasoning_tokens_estimate" ] = len (reasoning_text ) // 4
323304
324- langfuse_client = _get_langfuse_client ()
325- if langfuse_client :
326- try :
327- langfuse_client .update_current_generation (
328- output = output_message ,
329- usage = {
330- "input" : usage .get ("inputTokens" , 0 ),
331- "output" : usage .get ("outputTokens" , 0 ),
332- "total" : usage .get ("totalTokens" , 0 )
333- },
334- metadata = metadata
335- )
336- if DEBUG :
337- logger .info (f"Langfuse: Updated observation with output - "
338- f"input_tokens={ usage .get ('inputTokens' , 0 )} , "
339- f"output_tokens={ usage .get ('outputTokens' , 0 )} , "
340- f"has_reasoning={ has_reasoning } , "
341- f"stop_reason={ response .get ('stopReason' )} " )
342- except Exception as e :
343- logger .warning (f"Failed to update Langfuse: { e } " )
305+ langfuse_context .update_current_observation (
306+ output = output_message ,
307+ usage = {
308+ "input" : usage .get ("inputTokens" , 0 ),
309+ "output" : usage .get ("outputTokens" , 0 ),
310+ "total" : usage .get ("totalTokens" , 0 )
311+ },
312+ metadata = metadata
313+ )
314+ if DEBUG :
315+ logger .info (f"Langfuse: Updated observation with output - "
316+ f"input_tokens={ usage .get ('inputTokens' , 0 )} , "
317+ f"output_tokens={ usage .get ('outputTokens' , 0 )} , "
318+ f"has_reasoning={ has_reasoning } , "
319+ f"stop_reason={ response .get ('stopReason' )} " )
344320 except bedrock_runtime .exceptions .ValidationException as e :
345321 error_message = f"Bedrock validation error for model { chat_request .model } : { str (e )} "
346322 logger .error (error_message )
347- langfuse_client = _get_langfuse_client ()
348- if langfuse_client :
349- try :
350- langfuse_client .update_current_generation (level = "ERROR" , status_message = error_message )
351- if DEBUG :
352- logger .info (f"Langfuse: Updated observation with ValidationException error" )
353- except Exception :
354- pass
323+ langfuse_context .update_current_observation (level = "ERROR" , status_message = error_message )
324+ if DEBUG :
325+ logger .info ("Langfuse: Updated observation with ValidationException error" )
355326 raise HTTPException (status_code = 400 , detail = str (e ))
356327 except bedrock_runtime .exceptions .ThrottlingException as e :
357328 error_message = f"Bedrock throttling for model { chat_request .model } : { str (e )} "
358329 logger .warning (error_message )
359- langfuse_client = _get_langfuse_client ()
360- if langfuse_client :
361- try :
362- langfuse_client .update_current_generation (level = "WARNING" , status_message = error_message )
363- if DEBUG :
364- logger .info (f"Langfuse: Updated observation with ThrottlingException warning" )
365- except Exception :
366- pass
330+ langfuse_context .update_current_observation (level = "WARNING" , status_message = error_message )
331+ if DEBUG :
332+ logger .info ("Langfuse: Updated observation with ThrottlingException warning" )
367333 raise HTTPException (status_code = 429 , detail = str (e ))
368334 except Exception as e :
369335 error_message = f"Bedrock invocation failed for model { chat_request .model } : { str (e )} "
370336 logger .error (error_message )
371- langfuse_client = _get_langfuse_client ()
372- if langfuse_client :
373- try :
374- langfuse_client .update_current_generation (level = "ERROR" , status_message = error_message )
375- if DEBUG :
376- logger .info (f"Langfuse: Updated observation with generic Exception error" )
377- except Exception :
378- pass
337+ langfuse_context .update_current_observation (level = "ERROR" , status_message = error_message )
338+ if DEBUG :
339+ logger .info ("Langfuse: Updated observation with generic Exception error" )
379340 raise HTTPException (status_code = 500 , detail = str (e ))
380341 return response
381342
@@ -486,21 +447,16 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
486447 if metadata :
487448 update_params ["metadata" ] = metadata
488449
489- langfuse_client = _get_langfuse_client ()
490- if langfuse_client :
491- try :
492- langfuse_client .update_current_generation (** update_params )
493- if DEBUG :
494- output_length = len (accumulated_output )
495- logger .info (f"Langfuse: Updated observation with streaming output - "
496- f"chunks_count={ output_length } , "
497- f"output_chars={ len (final_output ) if accumulated_output else 0 } , "
498- f"input_tokens={ final_usage .prompt_tokens if final_usage else 'N/A' } , "
499- f"output_tokens={ final_usage .completion_tokens if final_usage else 'N/A' } , "
500- f"has_reasoning={ has_reasoning } , "
501- f"finish_reason={ finish_reason } " )
502- except Exception as e :
503- logger .warning (f"Failed to update Langfuse: { e } " )
450+ langfuse_context .update_current_observation (** update_params )
451+ if DEBUG :
452+ output_length = len (accumulated_output )
453+ logger .info (f"Langfuse: Updated observation with streaming output - "
454+ f"chunks_count={ output_length } , "
455+ f"output_chars={ len (final_output ) if accumulated_output else 0 } , "
456+ f"input_tokens={ final_usage .prompt_tokens if final_usage else 'N/A' } , "
457+ f"output_tokens={ final_usage .completion_tokens if final_usage else 'N/A' } , "
458+ f"has_reasoning={ has_reasoning } , "
459+ f"finish_reason={ finish_reason } " )
504460
505461 # return an [DONE] message at the end.
506462 yield self .stream_response_to_bytes ()
@@ -511,17 +467,12 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
511467 except Exception as e :
512468 logger .error ("Stream error for model %s: %s" , chat_request .model , str (e ))
513469 # Update Langfuse with error
514- langfuse_client = _get_langfuse_client ()
515- if langfuse_client :
516- try :
517- langfuse_client .update_current_generation (
518- level = "ERROR" ,
519- status_message = f"Stream error: { str (e )} "
520- )
521- if DEBUG :
522- logger .info (f"Langfuse: Updated observation with streaming error - error={ str (e )[:100 ]} " )
523- except Exception :
524- pass
470+ langfuse_context .update_current_observation (
471+ level = "ERROR" ,
472+ status_message = f"Stream error: { str (e )} "
473+ )
474+ if DEBUG :
475+ logger .info (f"Langfuse: Updated observation with streaming error - error={ str (e )[:100 ]} " )
525476 error_event = Error (error = ErrorMessage (message = str (e )))
526477 yield self .stream_response_to_bytes (error_event )
527478
0 commit comments