Skip to content

Commit abedb3f

Browse files
committed
fix: correct langfuse import path and use client API
- Changed import from 'langfuse_context' to use get_client() - Replaced all langfuse_context calls with client.update_current_generation() - Added Langfuse client helper function with error handling - Added graceful fallback if Langfuse unavailable - Pinned langfuse>=2.0.0 in requirements - Verified container builds and runs successfully
1 parent e79317a commit abedb3f

File tree

1 file changed

+102
-54
lines changed

1 file changed

+102
-54
lines changed

src/api/models/bedrock.py

Lines changed: 102 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import tiktoken
1515
from botocore.config import Config
1616
from fastapi import HTTPException
17-
from langfuse.decorators import observe, langfuse_context
17+
from langfuse import observe, get_client
1818
from starlette.concurrency import run_in_threadpool
1919

2020
from api.models.base import BaseChatModel, BaseEmbeddingsModel
@@ -54,6 +54,20 @@
5454

5555
logger = logging.getLogger(__name__)
5656

57+
# Initialize Langfuse client
58+
_langfuse_client = None
59+
60+
def _get_langfuse_client():
61+
"""Get or create the Langfuse client singleton."""
62+
global _langfuse_client
63+
if _langfuse_client is None:
64+
try:
65+
_langfuse_client = get_client()
66+
except Exception as e:
67+
logger.warning(f"Failed to initialize Langfuse client: {e}")
68+
_langfuse_client = None
69+
return _langfuse_client
70+
5771
config = Config(
5872
connect_timeout=60, # Connection timeout: 60 seconds
5973
read_timeout=900, # Read timeout: 15 minutes (suitable for long streaming responses)
@@ -253,18 +267,23 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
253267
}
254268

255269
# Update Langfuse generation with input metadata
256-
langfuse_context.update_current_observation(
257-
input=messages,
258-
model=model_id,
259-
model_parameters=model_parameters,
260-
metadata={
261-
'system': args_clone.get('system', []),
262-
'toolConfig': args_clone.get('toolConfig', {}),
263-
'stream': stream
264-
}
265-
)
266-
if DEBUG:
267-
logger.info(f"Langfuse: Updated observation with input - model={model_id}, stream={stream}, messages_count={len(messages)}")
270+
langfuse_client = _get_langfuse_client()
271+
if langfuse_client:
272+
try:
273+
langfuse_client.update_current_generation(
274+
input=messages,
275+
model=model_id,
276+
model_parameters=model_parameters,
277+
metadata={
278+
'system': args_clone.get('system', []),
279+
'toolConfig': args_clone.get('toolConfig', {}),
280+
'stream': stream
281+
}
282+
)
283+
if DEBUG:
284+
logger.info(f"Langfuse: Updated observation with input - model={model_id}, stream={stream}, messages_count={len(messages)}")
285+
except Exception as e:
286+
logger.warning(f"Failed to update Langfuse: {e}")
268287

269288
try:
270289
if stream:
@@ -302,41 +321,61 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
302321
metadata["reasoning_content"] = reasoning_text
303322
metadata["reasoning_tokens_estimate"] = len(reasoning_text) // 4
304323

305-
langfuse_context.update_current_observation(
306-
output=output_message,
307-
usage={
308-
"input": usage.get("inputTokens", 0),
309-
"output": usage.get("outputTokens", 0),
310-
"total": usage.get("totalTokens", 0)
311-
},
312-
metadata=metadata
313-
)
314-
if DEBUG:
315-
logger.info(f"Langfuse: Updated observation with output - "
316-
f"input_tokens={usage.get('inputTokens', 0)}, "
317-
f"output_tokens={usage.get('outputTokens', 0)}, "
318-
f"has_reasoning={has_reasoning}, "
319-
f"stop_reason={response.get('stopReason')}")
324+
langfuse_client = _get_langfuse_client()
325+
if langfuse_client:
326+
try:
327+
langfuse_client.update_current_generation(
328+
output=output_message,
329+
usage={
330+
"input": usage.get("inputTokens", 0),
331+
"output": usage.get("outputTokens", 0),
332+
"total": usage.get("totalTokens", 0)
333+
},
334+
metadata=metadata
335+
)
336+
if DEBUG:
337+
logger.info(f"Langfuse: Updated observation with output - "
338+
f"input_tokens={usage.get('inputTokens', 0)}, "
339+
f"output_tokens={usage.get('outputTokens', 0)}, "
340+
f"has_reasoning={has_reasoning}, "
341+
f"stop_reason={response.get('stopReason')}")
342+
except Exception as e:
343+
logger.warning(f"Failed to update Langfuse: {e}")
320344
except bedrock_runtime.exceptions.ValidationException as e:
321345
error_message = f"Bedrock validation error for model {chat_request.model}: {str(e)}"
322346
logger.error(error_message)
323-
langfuse_context.update_current_observation(level="ERROR", status_message=error_message)
324-
if DEBUG:
325-
logger.info(f"Langfuse: Updated observation with ValidationException error")
347+
langfuse_client = _get_langfuse_client()
348+
if langfuse_client:
349+
try:
350+
langfuse_client.update_current_generation(level="ERROR", status_message=error_message)
351+
if DEBUG:
352+
logger.info(f"Langfuse: Updated observation with ValidationException error")
353+
except Exception:
354+
pass
326355
raise HTTPException(status_code=400, detail=str(e))
327356
except bedrock_runtime.exceptions.ThrottlingException as e:
328357
error_message = f"Bedrock throttling for model {chat_request.model}: {str(e)}"
329358
logger.warning(error_message)
330-
langfuse_context.update_current_observation(level="WARNING", status_message=error_message)
331-
if DEBUG:
332-
logger.info(f"Langfuse: Updated observation with ThrottlingException warning")
359+
langfuse_client = _get_langfuse_client()
360+
if langfuse_client:
361+
try:
362+
langfuse_client.update_current_generation(level="WARNING", status_message=error_message)
363+
if DEBUG:
364+
logger.info(f"Langfuse: Updated observation with ThrottlingException warning")
365+
except Exception:
366+
pass
333367
raise HTTPException(status_code=429, detail=str(e))
334368
except Exception as e:
335369
error_message = f"Bedrock invocation failed for model {chat_request.model}: {str(e)}"
336370
logger.error(error_message)
337-
langfuse_context.update_current_observation(level="ERROR", status_message=error_message)
338-
if DEBUG:
339-
logger.info(f"Langfuse: Updated observation with generic Exception error")
371+
langfuse_client = _get_langfuse_client()
372+
if langfuse_client:
373+
try:
374+
langfuse_client.update_current_generation(level="ERROR", status_message=error_message)
375+
if DEBUG:
376+
logger.info(f"Langfuse: Updated observation with generic Exception error")
377+
except Exception:
378+
pass
340379
raise HTTPException(status_code=500, detail=str(e))
341380
return response
342381

@@ -447,17 +486,21 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
447486
if metadata:
448487
update_params["metadata"] = metadata
449488

450-
langfuse_context.update_current_observation(**update_params)
451-
452-
if DEBUG:
453-
output_length = len(accumulated_output)
454-
logger.info(f"Langfuse: Updated observation with streaming output - "
455-
f"chunks_count={output_length}, "
456-
f"output_chars={len(final_output) if accumulated_output else 0}, "
457-
f"input_tokens={final_usage.prompt_tokens if final_usage else 'N/A'}, "
458-
f"output_tokens={final_usage.completion_tokens if final_usage else 'N/A'}, "
459-
f"has_reasoning={has_reasoning}, "
460-
f"finish_reason={finish_reason}")
489+
langfuse_client = _get_langfuse_client()
490+
if langfuse_client:
491+
try:
492+
langfuse_client.update_current_generation(**update_params)
493+
if DEBUG:
494+
output_length = len(accumulated_output)
495+
logger.info(f"Langfuse: Updated observation with streaming output - "
496+
f"chunks_count={output_length}, "
497+
f"output_chars={len(final_output) if accumulated_output else 0}, "
498+
f"input_tokens={final_usage.prompt_tokens if final_usage else 'N/A'}, "
499+
f"output_tokens={final_usage.completion_tokens if final_usage else 'N/A'}, "
500+
f"has_reasoning={has_reasoning}, "
501+
f"finish_reason={finish_reason}")
502+
except Exception as e:
503+
logger.warning(f"Failed to update Langfuse: {e}")
461504

462505
# return an [DONE] message at the end.
463506
yield self.stream_response_to_bytes()
@@ -468,12 +511,17 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
468511
except Exception as e:
469512
logger.error("Stream error for model %s: %s", chat_request.model, str(e))
470513
# Update Langfuse with error
471-
langfuse_context.update_current_observation(
472-
level="ERROR",
473-
status_message=f"Stream error: {str(e)}"
474-
)
475-
if DEBUG:
476-
logger.info(f"Langfuse: Updated observation with streaming error - error={str(e)[:100]}")
514+
langfuse_client = _get_langfuse_client()
515+
if langfuse_client:
516+
try:
517+
langfuse_client.update_current_generation(
518+
level="ERROR",
519+
status_message=f"Stream error: {str(e)}"
520+
)
521+
if DEBUG:
522+
logger.info(f"Langfuse: Updated observation with streaming error - error={str(e)[:100]}")
523+
except Exception:
524+
pass
477525
error_event = Error(error=ErrorMessage(message=str(e)))
478526
yield self.stream_response_to_bytes(error_event)
479527

0 commit comments

Comments
 (0)