From f14dd1cb4e8fedf0dd67bb3cb13ec9ccad1893da Mon Sep 17 00:00:00 2001 From: dane Date: Tue, 18 Mar 2025 10:11:45 -0500 Subject: [PATCH 01/19] feat: customize model list using environment --- src/api/models/bedrock.py | 6 +++++- src/api/setting.py | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index be3fab28..9bbb8103 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -36,7 +36,7 @@ Usage, UserMessage, ) -from api.setting import AWS_REGION, DEBUG, DEFAULT_MODEL, ENABLE_CROSS_REGION_INFERENCE +from api.setting import AWS_REGION, CUSTOM_MODEL_LIST, DEBUG, DEFAULT_MODEL, ENABLE_CROSS_REGION_INFERENCE logger = logging.getLogger(__name__) @@ -101,6 +101,10 @@ def list_bedrock_models() -> dict: if not stream_supported or status not in ["ACTIVE", "LEGACY"]: continue + # if the user provides a custom model list, filter only those models + if CUSTOM_MODEL_LIST and model_id not in CUSTOM_MODEL_LIST: + continue + inference_types = model.get("inferenceTypesSupported", []) input_modalities = model["inputModalities"] # Add on-demand model list diff --git a/src/api/setting.py b/src/api/setting.py index e090300a..1decfd8c 100644 --- a/src/api/setting.py +++ b/src/api/setting.py @@ -15,4 +15,5 @@ AWS_REGION = os.environ.get("AWS_REGION", "us-west-2") DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "anthropic.claude-3-sonnet-20240229-v1:0") DEFAULT_EMBEDDING_MODEL = os.environ.get("DEFAULT_EMBEDDING_MODEL", "cohere.embed-multilingual-v3") +CUSTOM_MODEL_LIST = os.environ.get("CUSTOM_MODEL_LIST", "").split(",") ENABLE_CROSS_REGION_INFERENCE = os.environ.get("ENABLE_CROSS_REGION_INFERENCE", "true").lower() != "false" From 796670b2edd2247507f4b85b95cc2cad501bea96 Mon Sep 17 00:00:00 2001 From: 2underscores Date: Wed, 28 May 2025 20:14:07 +1000 Subject: [PATCH 02/19] Fixed blank CUSTOM_MODEL_LIST handling --- src/api/setting.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/api/setting.py b/src/api/setting.py index 1decfd8c..40c23182 100644 --- a/src/api/setting.py +++ b/src/api/setting.py @@ -15,5 +15,6 @@ AWS_REGION = os.environ.get("AWS_REGION", "us-west-2") DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "anthropic.claude-3-sonnet-20240229-v1:0") DEFAULT_EMBEDDING_MODEL = os.environ.get("DEFAULT_EMBEDDING_MODEL", "cohere.embed-multilingual-v3") -CUSTOM_MODEL_LIST = os.environ.get("CUSTOM_MODEL_LIST", "").split(",") +CUSTOM_MODEL_CSV = os.environ.get("CUSTOM_MODEL_LIST", "") +CUSTOM_MODEL_LIST = [m.strip() for m in CUSTOM_MODEL_CSV.split(",") if m.strip()] ENABLE_CROSS_REGION_INFERENCE = os.environ.get("ENABLE_CROSS_REGION_INFERENCE", "true").lower() != "false" From d133e2fe9bfd241fdfc2b853ded56317e44d5406 Mon Sep 17 00:00:00 2001 From: Oscar Fawkes Date: Mon, 16 Jun 2025 16:01:14 +1000 Subject: [PATCH 03/19] Configurable AWS retries via environment variable --- src/api/models/bedrock.py | 8 ++++++-- src/api/setting.py | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index c4e1558c..f618cd2f 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -38,11 +38,15 @@ Usage, UserMessage, ) -from api.setting import AWS_REGION, CUSTOM_MODEL_LIST, DEBUG, DEFAULT_MODEL, ENABLE_CROSS_REGION_INFERENCE +from api.setting import AWS_REGION, CUSTOM_MODEL_LIST, DEBUG, DEFAULT_MODEL, ENABLE_CROSS_REGION_INFERENCE, MAX_RETRIES_AWS logger = logging.getLogger(__name__) -config = Config(connect_timeout=60, read_timeout=120, retries={"max_attempts": 1}) +config = Config( + connect_timeout=60, + read_timeout=120, + retries={"mode": "standard", "max_attempts": MAX_RETRIES_AWS} +) bedrock_runtime = boto3.client( service_name="bedrock-runtime", diff --git a/src/api/setting.py b/src/api/setting.py index 40c23182..69500028 100644 --- a/src/api/setting.py +++ b/src/api/setting.py @@ -18,3 +18,4 @@ CUSTOM_MODEL_CSV = os.environ.get("CUSTOM_MODEL_LIST", "") CUSTOM_MODEL_LIST = [m.strip() for m in CUSTOM_MODEL_CSV.split(",") if m.strip()] ENABLE_CROSS_REGION_INFERENCE = os.environ.get("ENABLE_CROSS_REGION_INFERENCE", "true").lower() != "false" +MAX_RETRIES_AWS = int(os.environ.get("MAX_RETRIES_AWS", "3")) From f48f37ebde3460b08dccc0c257fd4af5f0954d09 Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Thu, 7 Aug 2025 14:58:38 +1000 Subject: [PATCH 04/19] fix: remove custom models bug --- src/api/models/bedrock.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 72907662..de2d87e5 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -145,10 +145,6 @@ def list_bedrock_models() -> dict: if not stream_supported or status not in ["ACTIVE", "LEGACY"]: continue - # if the user provides a custom model list, filter only those models - if CUSTOM_MODEL_LIST and model_id not in CUSTOM_MODEL_LIST: - continue - inference_types = model.get("inferenceTypesSupported", []) input_modalities = model["inputModalities"] # Add on-demand model list From 9c89baf5efd86411fc2a7779f3077061d65f6702 Mon Sep 17 00:00:00 2001 From: Rizvi Rahim Date: Thu, 7 Aug 2025 09:19:14 +0600 Subject: [PATCH 05/19] Add pagination to list_inference_profiles calls --- src/api/models/bedrock.py | 41 ++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index de2d87e5..69ab134f 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -108,31 +108,32 @@ def list_bedrock_models() -> dict: if ENABLE_CROSS_REGION_INFERENCE: # List system defined inference profile IDs - response = bedrock_client.list_inference_profiles(maxResults=1000, typeEquals="SYSTEM_DEFINED") - profile_list = [p["inferenceProfileId"] for p in response["inferenceProfileSummaries"]] + paginator = bedrock_client.get_paginator('list_inference_profiles') + for page in paginator.paginate(maxResults=1000, typeEquals="SYSTEM_DEFINED"): + profile_list.extend([p["inferenceProfileId"] for p in page["inferenceProfileSummaries"]]) if ENABLE_APPLICATION_INFERENCE_PROFILES: # List application defined inference profile IDs and create mapping - response = bedrock_client.list_inference_profiles(maxResults=1000, typeEquals="APPLICATION") - - for profile in response["inferenceProfileSummaries"]: - try: - profile_arn = profile.get("inferenceProfileArn") - if not profile_arn: + paginator = bedrock_client.get_paginator('list_inference_profiles') + for page in paginator.paginate(maxResults=1000, typeEquals="APPLICATION"): + for profile in page["inferenceProfileSummaries"]: + try: + profile_arn = profile.get("inferenceProfileArn") + if not profile_arn: + continue + + # Process all models in the profile + models = profile.get("models", []) + for model in models: + model_arn = model.get("modelArn", "") + if model_arn: + model_id = model_arn.split('/')[-1] if '/' in model_arn else model_arn + if model_id: + app_profile_dict[model_id] = profile_arn + except Exception as e: + logger.warning(f"Error processing application profile: {e}") continue - # Process all models in the profile - models = profile.get("models", []) - for model in models: - model_arn = model.get("modelArn", "") - if model_arn: - model_id = model_arn.split('/')[-1] if '/' in model_arn else model_arn - if model_id: - app_profile_dict[model_id] = profile_arn - except Exception as e: - logger.warning(f"Error processing application profile: {e}") - continue - # List foundation models, only cares about text outputs here. response = bedrock_client.list_foundation_models(byOutputModality="TEXT") From 8c7bd77e266bc6b059cc4466344adb3a7b4b85da Mon Sep 17 00:00:00 2001 From: Oscar Fawkes Date: Tue, 23 Sep 2025 17:41:44 +1000 Subject: [PATCH 06/19] impl model list caching --- src/api/models/bedrock.py | 32 +++++++++++++++++++++++++++++--- src/api/setting.py | 1 + 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 69ab134f..9cdeb266 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -46,6 +46,7 @@ ENABLE_CROSS_REGION_INFERENCE, ENABLE_APPLICATION_INFERENCE_PROFILES, MAX_RETRIES_AWS, + MODEL_CACHE_TTL, ) logger = logging.getLogger(__name__) @@ -171,15 +172,40 @@ def list_bedrock_models() -> dict: return model_list +# In-memory cache +_model_cache = { + "data": None, + "timestamp": 0 +} + +def _get_cached_models(): + """Get models from in-memory cache if still valid.""" + global _model_cache + + current_time = time.time() + cache_age = current_time - _model_cache["timestamp"] + + if _model_cache["data"] is None or cache_age > MODEL_CACHE_TTL: + fresh_models = list_bedrock_models() + if fresh_models: + _model_cache["data"] = fresh_models + _model_cache["timestamp"] = current_time + return fresh_models + else: + # Cache hit + return _model_cache["data"] + # Initialize the model list. -bedrock_model_list = list_bedrock_models() +bedrock_model_list = _get_cached_models() class BedrockModel(BaseChatModel): def list_models(self) -> list[str]: - """Always refresh the latest model list""" + """Get model list using in-memory cache with TTL""" global bedrock_model_list - bedrock_model_list = list_bedrock_models() + cached_models = _get_cached_models() + if cached_models: + bedrock_model_list = cached_models return list(bedrock_model_list.keys()) def validate(self, chat_request: ChatRequest): diff --git a/src/api/setting.py b/src/api/setting.py index d9dff52d..da7281a2 100644 --- a/src/api/setting.py +++ b/src/api/setting.py @@ -20,3 +20,4 @@ ENABLE_CROSS_REGION_INFERENCE = os.environ.get("ENABLE_CROSS_REGION_INFERENCE", "true").lower() != "false" MAX_RETRIES_AWS = int(os.environ.get("MAX_RETRIES_AWS", "3")) ENABLE_APPLICATION_INFERENCE_PROFILES = os.environ.get("ENABLE_APPLICATION_INFERENCE_PROFILES", "true").lower() != "false" +MODEL_CACHE_TTL = int(os.environ.get("MODEL_CACHE_TTL", "3600")) # 1 hour default From 4934ec0edc57e662cf5a9b0f3ef2c10669e43014 Mon Sep 17 00:00:00 2001 From: Oscar Fawkes Date: Tue, 23 Sep 2025 17:42:26 +1000 Subject: [PATCH 07/19] impl uvicorn worker count configurability in containerised deployments --- src/Dockerfile_ecs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Dockerfile_ecs b/src/Dockerfile_ecs index c1240104..ec690af9 100644 --- a/src/Dockerfile_ecs +++ b/src/Dockerfile_ecs @@ -9,5 +9,6 @@ RUN pip install --no-cache-dir --upgrade -r /app/requirements.txt COPY ./api /app/api ENV PORT=80 +ENV WORKERS=1 -CMD ["sh", "-c", "uvicorn api.app:app --host 0.0.0.0 --port ${PORT}"] +CMD ["sh", "-c", "uvicorn api.app:app --host 0.0.0.0 --port ${PORT} --workers ${WORKERS}"] From 221e93554da9a375cfb4e238fd1e6edd14c5546d Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Thu, 13 Nov 2025 17:00:12 +1100 Subject: [PATCH 08/19] feat: add langfuse observability to bedrock gateway --- src/api/models/bedrock.py | 130 +++++++++++++++++++++++++++++++++++++- src/requirements.txt | 1 + 2 files changed, 128 insertions(+), 3 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index db8af517..28871806 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -1,6 +1,7 @@ import base64 import json import logging +import os import re import time from abc import ABC @@ -13,6 +14,7 @@ import tiktoken from botocore.config import Config from fastapi import HTTPException +from langfuse import observe, langfuse_context from starlette.concurrency import run_in_threadpool from api.models.base import BaseChatModel, BaseEmbeddingsModel @@ -230,6 +232,7 @@ def validate(self, chat_request: ChatRequest): detail=error, ) + @observe(as_type="generation", name="Bedrock Converse") async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): """Common logic for invoke bedrock models""" if DEBUG: @@ -240,6 +243,27 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): if DEBUG: logger.info("Bedrock request: " + json.dumps(str(args))) + # Extract model metadata for Langfuse + args_clone = args.copy() + messages = args_clone.get('messages', []) + model_id = args_clone.get('modelId', 'unknown') + model_parameters = { + **args_clone.get('inferenceConfig', {}), + **args_clone.get('additionalModelRequestFields', {}) + } + + # Update Langfuse generation with input metadata + langfuse_context.update_current_observation( + input=messages, + model=model_id, + model_parameters=model_parameters, + metadata={ + 'system': args_clone.get('system', []), + 'toolConfig': args_clone.get('toolConfig', {}), + 'stream': stream + } + ) + try: if stream: # Run the blocking boto3 call in a thread pool @@ -249,14 +273,56 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): else: # Run the blocking boto3 call in a thread pool response = await run_in_threadpool(bedrock_runtime.converse, **args) + + # For non-streaming, extract response metadata immediately + if response and not stream: + output_message = response.get("output", {}).get("message", {}) + usage = response.get("usage", {}) + + # Build metadata + metadata = { + "stopReason": response.get("stopReason"), + "ResponseMetadata": response.get("ResponseMetadata", {}) + } + + # Check for reasoning content in response + has_reasoning = False + reasoning_text = "" + if output_message and "content" in output_message: + for content_block in output_message.get("content", []): + if "reasoningContent" in content_block: + has_reasoning = True + reasoning_text = content_block.get("reasoningContent", {}).get("reasoningText", {}).get("text", "") + break + + if has_reasoning and reasoning_text: + metadata["has_extended_thinking"] = True + metadata["reasoning_content"] = reasoning_text + metadata["reasoning_tokens_estimate"] = len(reasoning_text) // 4 + + langfuse_context.update_current_observation( + output=output_message, + usage={ + "input": usage.get("inputTokens", 0), + "output": usage.get("outputTokens", 0), + "total": usage.get("totalTokens", 0) + }, + metadata=metadata + ) except bedrock_runtime.exceptions.ValidationException as e: - logger.error("Bedrock validation error for model %s: %s", chat_request.model, str(e)) + error_message = f"Bedrock validation error for model {chat_request.model}: {str(e)}" + logger.error(error_message) + langfuse_context.update_current_observation(level="ERROR", status_message=error_message) raise HTTPException(status_code=400, detail=str(e)) except bedrock_runtime.exceptions.ThrottlingException as e: - logger.warning("Bedrock throttling for model %s: %s", chat_request.model, str(e)) + error_message = f"Bedrock throttling for model {chat_request.model}: {str(e)}" + logger.warning(error_message) + langfuse_context.update_current_observation(level="WARNING", status_message=error_message) raise HTTPException(status_code=429, detail=str(e)) except Exception as e: - logger.error("Bedrock invocation failed for model %s: %s", chat_request.model, str(e)) + error_message = f"Bedrock invocation failed for model {chat_request.model}: {str(e)}" + logger.error(error_message) + langfuse_context.update_current_observation(level="ERROR", status_message=error_message) raise HTTPException(status_code=500, detail=str(e)) return response @@ -296,11 +362,37 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: message_id = self.generate_message_id() stream = response.get("stream") self.think_emitted = False + + # Track streaming output and usage for Langfuse + accumulated_output = [] + accumulated_reasoning = [] + final_usage = None + finish_reason = None + has_reasoning = False + async for chunk in self._async_iterate(stream): args = {"model_id": chat_request.model, "message_id": message_id, "chunk": chunk} stream_response = self._create_response_stream(**args) if not stream_response: continue + + # Accumulate output content for Langfuse tracking + if stream_response.choices: + for choice in stream_response.choices: + if choice.delta and choice.delta.content: + content = choice.delta.content + # Check if this is reasoning content (wrapped in tags) + if "" in content or self.think_emitted: + accumulated_reasoning.append(content) + has_reasoning = True + accumulated_output.append(content) + if choice.finish_reason: + finish_reason = choice.finish_reason + + # Capture final usage metrics for Langfuse tracking + if stream_response.usage: + final_usage = stream_response.usage + if DEBUG: logger.info("Proxy response :" + stream_response.model_dump_json()) if stream_response.choices: @@ -314,11 +406,43 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: # All other chunks will also include a usage field, but with a null value. yield self.stream_response_to_bytes(stream_response) + # Update Langfuse with final streaming metadata + if final_usage or accumulated_output: + update_params = {} + if accumulated_output: + final_output = "".join(accumulated_output) + update_params["output"] = final_output + if final_usage: + update_params["usage"] = { + "input": final_usage.prompt_tokens, + "output": final_usage.completion_tokens, + "total": final_usage.total_tokens + } + # Build metadata + metadata = {} + if finish_reason: + metadata["finish_reason"] = finish_reason + if has_reasoning and accumulated_reasoning: + reasoning_text = "".join(accumulated_reasoning) + metadata["has_extended_thinking"] = True + metadata["reasoning_content"] = reasoning_text + # Estimate reasoning tokens (rough approximation: ~4 chars per token) + metadata["reasoning_tokens_estimate"] = len(reasoning_text) // 4 + if metadata: + update_params["metadata"] = metadata + + langfuse_context.update_current_observation(**update_params) + # return an [DONE] message at the end. yield self.stream_response_to_bytes() self.think_emitted = False # Cleanup except Exception as e: logger.error("Stream error for model %s: %s", chat_request.model, str(e)) + # Update Langfuse with error + langfuse_context.update_current_observation( + level="ERROR", + status_message=f"Stream error: {str(e)}" + ) error_event = Error(error=ErrorMessage(message=str(e))) yield self.stream_response_to_bytes(error_event) diff --git a/src/requirements.txt b/src/requirements.txt index 9aa0e2da..e803d326 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -7,3 +7,4 @@ requests==2.32.4 numpy==2.2.5 boto3==1.40.4 botocore==1.40.4 +langfuse From 56db50e94208823a841f1b14c5bb2d1cac3ec953 Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Fri, 14 Nov 2025 10:03:33 +1100 Subject: [PATCH 09/19] chore: add debugging statements --- src/api/models/bedrock.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 28871806..a79c31b2 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -263,6 +263,8 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): 'stream': stream } ) + if DEBUG: + logger.info(f"Langfuse: Updated observation with input - model={model_id}, stream={stream}, messages_count={len(messages)}") try: if stream: @@ -309,20 +311,32 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): }, metadata=metadata ) + if DEBUG: + logger.info(f"Langfuse: Updated observation with output - " + f"input_tokens={usage.get('inputTokens', 0)}, " + f"output_tokens={usage.get('outputTokens', 0)}, " + f"has_reasoning={has_reasoning}, " + f"stop_reason={response.get('stopReason')}") except bedrock_runtime.exceptions.ValidationException as e: error_message = f"Bedrock validation error for model {chat_request.model}: {str(e)}" logger.error(error_message) langfuse_context.update_current_observation(level="ERROR", status_message=error_message) + if DEBUG: + logger.info(f"Langfuse: Updated observation with ValidationException error") raise HTTPException(status_code=400, detail=str(e)) except bedrock_runtime.exceptions.ThrottlingException as e: error_message = f"Bedrock throttling for model {chat_request.model}: {str(e)}" logger.warning(error_message) langfuse_context.update_current_observation(level="WARNING", status_message=error_message) + if DEBUG: + logger.info(f"Langfuse: Updated observation with ThrottlingException warning") raise HTTPException(status_code=429, detail=str(e)) except Exception as e: error_message = f"Bedrock invocation failed for model {chat_request.model}: {str(e)}" logger.error(error_message) langfuse_context.update_current_observation(level="ERROR", status_message=error_message) + if DEBUG: + logger.info(f"Langfuse: Updated observation with generic Exception error") raise HTTPException(status_code=500, detail=str(e)) return response @@ -358,6 +372,8 @@ async def _async_iterate(self, stream): async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: """Default implementation for Chat Stream API""" try: + if DEBUG: + logger.info(f"Langfuse: Starting streaming request for model={chat_request.model}") response = await self._invoke_bedrock(chat_request, stream=True) message_id = self.generate_message_id() stream = response.get("stream") @@ -432,6 +448,16 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: update_params["metadata"] = metadata langfuse_context.update_current_observation(**update_params) + + if DEBUG: + output_length = len(accumulated_output) + logger.info(f"Langfuse: Updated observation with streaming output - " + f"chunks_count={output_length}, " + f"output_chars={len(final_output) if accumulated_output else 0}, " + f"input_tokens={final_usage.prompt_tokens if final_usage else 'N/A'}, " + f"output_tokens={final_usage.completion_tokens if final_usage else 'N/A'}, " + f"has_reasoning={has_reasoning}, " + f"finish_reason={finish_reason}") # return an [DONE] message at the end. yield self.stream_response_to_bytes() @@ -443,6 +469,8 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: level="ERROR", status_message=f"Stream error: {str(e)}" ) + if DEBUG: + logger.info(f"Langfuse: Updated observation with streaming error - error={str(e)[:100]}") error_event = Error(error=ErrorMessage(message=str(e))) yield self.stream_response_to_bytes(error_event) From 7d7a69dd31e19b8003ce08af3c71f92add7e7163 Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Fri, 14 Nov 2025 10:26:18 +1100 Subject: [PATCH 10/19] fix: add missing think tag --- src/api/models/bedrock.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index a79c31b2..6764d22a 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -899,6 +899,7 @@ def _create_response_stream( # Port of "signature_delta" if self.think_emitted: message = ChatResponseMessage(content="\n \n\n") + self.think_emitted = False # Reset flag after closing else: return None # Ignore signature if no started else: From 4c98a9892e38d4667c2480994f6e79eab6877606 Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Fri, 14 Nov 2025 10:27:46 +1100 Subject: [PATCH 11/19] fix: add HTTPException --- src/api/models/bedrock.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 6764d22a..0cc5bf1e 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -462,6 +462,9 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: # return an [DONE] message at the end. yield self.stream_response_to_bytes() self.think_emitted = False # Cleanup + except HTTPException: + # HTTPException already has Langfuse updated in _invoke_bedrock, re-raise it + raise except Exception as e: logger.error("Stream error for model %s: %s", chat_request.model, str(e)) # Update Langfuse with error From e79317ad074fca1aac609d2d4c3c18e01ccc1dc2 Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Fri, 14 Nov 2025 14:05:20 +1100 Subject: [PATCH 12/19] fix: resolve incorrect import --- src/api/models/bedrock.py | 2 +- src/requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 0cc5bf1e..6b0dbd79 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -14,7 +14,7 @@ import tiktoken from botocore.config import Config from fastapi import HTTPException -from langfuse import observe, langfuse_context +from langfuse.decorators import observe, langfuse_context from starlette.concurrency import run_in_threadpool from api.models.base import BaseChatModel, BaseEmbeddingsModel diff --git a/src/requirements.txt b/src/requirements.txt index e803d326..1f8974b9 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -7,4 +7,4 @@ requests==2.32.4 numpy==2.2.5 boto3==1.40.4 botocore==1.40.4 -langfuse +langfuse>=2.0.0 From abedb3f00987e66397168fd7f6e901cabb854e9f Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Fri, 14 Nov 2025 14:15:25 +1100 Subject: [PATCH 13/19] fix: correct langfuse import path and use client API - Changed import from 'langfuse_context' to use get_client() - Replaced all langfuse_context calls with client.update_current_generation() - Added Langfuse client helper function with error handling - Added graceful fallback if Langfuse unavailable - Pinned langfuse>=2.0.0 in requirements - Verified container builds and runs successfully --- src/api/models/bedrock.py | 156 +++++++++++++++++++++++++------------- 1 file changed, 102 insertions(+), 54 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 6b0dbd79..8ccb2df4 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -14,7 +14,7 @@ import tiktoken from botocore.config import Config from fastapi import HTTPException -from langfuse.decorators import observe, langfuse_context +from langfuse import observe, get_client from starlette.concurrency import run_in_threadpool from api.models.base import BaseChatModel, BaseEmbeddingsModel @@ -54,6 +54,20 @@ logger = logging.getLogger(__name__) +# Initialize Langfuse client +_langfuse_client = None + +def _get_langfuse_client(): + """Get or create the Langfuse client singleton.""" + global _langfuse_client + if _langfuse_client is None: + try: + _langfuse_client = get_client() + except Exception as e: + logger.warning(f"Failed to initialize Langfuse client: {e}") + _langfuse_client = None + return _langfuse_client + config = Config( connect_timeout=60, # Connection timeout: 60 seconds read_timeout=900, # Read timeout: 15 minutes (suitable for long streaming responses) @@ -253,18 +267,23 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): } # Update Langfuse generation with input metadata - langfuse_context.update_current_observation( - input=messages, - model=model_id, - model_parameters=model_parameters, - metadata={ - 'system': args_clone.get('system', []), - 'toolConfig': args_clone.get('toolConfig', {}), - 'stream': stream - } - ) - if DEBUG: - logger.info(f"Langfuse: Updated observation with input - model={model_id}, stream={stream}, messages_count={len(messages)}") + langfuse_client = _get_langfuse_client() + if langfuse_client: + try: + langfuse_client.update_current_generation( + input=messages, + model=model_id, + model_parameters=model_parameters, + metadata={ + 'system': args_clone.get('system', []), + 'toolConfig': args_clone.get('toolConfig', {}), + 'stream': stream + } + ) + if DEBUG: + logger.info(f"Langfuse: Updated observation with input - model={model_id}, stream={stream}, messages_count={len(messages)}") + except Exception as e: + logger.warning(f"Failed to update Langfuse: {e}") try: if stream: @@ -302,41 +321,61 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): metadata["reasoning_content"] = reasoning_text metadata["reasoning_tokens_estimate"] = len(reasoning_text) // 4 - langfuse_context.update_current_observation( - output=output_message, - usage={ - "input": usage.get("inputTokens", 0), - "output": usage.get("outputTokens", 0), - "total": usage.get("totalTokens", 0) - }, - metadata=metadata - ) - if DEBUG: - logger.info(f"Langfuse: Updated observation with output - " - f"input_tokens={usage.get('inputTokens', 0)}, " - f"output_tokens={usage.get('outputTokens', 0)}, " - f"has_reasoning={has_reasoning}, " - f"stop_reason={response.get('stopReason')}") + langfuse_client = _get_langfuse_client() + if langfuse_client: + try: + langfuse_client.update_current_generation( + output=output_message, + usage={ + "input": usage.get("inputTokens", 0), + "output": usage.get("outputTokens", 0), + "total": usage.get("totalTokens", 0) + }, + metadata=metadata + ) + if DEBUG: + logger.info(f"Langfuse: Updated observation with output - " + f"input_tokens={usage.get('inputTokens', 0)}, " + f"output_tokens={usage.get('outputTokens', 0)}, " + f"has_reasoning={has_reasoning}, " + f"stop_reason={response.get('stopReason')}") + except Exception as e: + logger.warning(f"Failed to update Langfuse: {e}") except bedrock_runtime.exceptions.ValidationException as e: error_message = f"Bedrock validation error for model {chat_request.model}: {str(e)}" logger.error(error_message) - langfuse_context.update_current_observation(level="ERROR", status_message=error_message) - if DEBUG: - logger.info(f"Langfuse: Updated observation with ValidationException error") + langfuse_client = _get_langfuse_client() + if langfuse_client: + try: + langfuse_client.update_current_generation(level="ERROR", status_message=error_message) + if DEBUG: + logger.info(f"Langfuse: Updated observation with ValidationException error") + except Exception: + pass raise HTTPException(status_code=400, detail=str(e)) except bedrock_runtime.exceptions.ThrottlingException as e: error_message = f"Bedrock throttling for model {chat_request.model}: {str(e)}" logger.warning(error_message) - langfuse_context.update_current_observation(level="WARNING", status_message=error_message) - if DEBUG: - logger.info(f"Langfuse: Updated observation with ThrottlingException warning") + langfuse_client = _get_langfuse_client() + if langfuse_client: + try: + langfuse_client.update_current_generation(level="WARNING", status_message=error_message) + if DEBUG: + logger.info(f"Langfuse: Updated observation with ThrottlingException warning") + except Exception: + pass raise HTTPException(status_code=429, detail=str(e)) except Exception as e: error_message = f"Bedrock invocation failed for model {chat_request.model}: {str(e)}" logger.error(error_message) - langfuse_context.update_current_observation(level="ERROR", status_message=error_message) - if DEBUG: - logger.info(f"Langfuse: Updated observation with generic Exception error") + langfuse_client = _get_langfuse_client() + if langfuse_client: + try: + langfuse_client.update_current_generation(level="ERROR", status_message=error_message) + if DEBUG: + logger.info(f"Langfuse: Updated observation with generic Exception error") + except Exception: + pass raise HTTPException(status_code=500, detail=str(e)) return response @@ -447,17 +486,21 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: if metadata: update_params["metadata"] = metadata - langfuse_context.update_current_observation(**update_params) - - if DEBUG: - output_length = len(accumulated_output) - logger.info(f"Langfuse: Updated observation with streaming output - " - f"chunks_count={output_length}, " - f"output_chars={len(final_output) if accumulated_output else 0}, " - f"input_tokens={final_usage.prompt_tokens if final_usage else 'N/A'}, " - f"output_tokens={final_usage.completion_tokens if final_usage else 'N/A'}, " - f"has_reasoning={has_reasoning}, " - f"finish_reason={finish_reason}") + langfuse_client = _get_langfuse_client() + if langfuse_client: + try: + langfuse_client.update_current_generation(**update_params) + if DEBUG: + output_length = len(accumulated_output) + logger.info(f"Langfuse: Updated observation with streaming output - " + f"chunks_count={output_length}, " + f"output_chars={len(final_output) if accumulated_output else 0}, " + f"input_tokens={final_usage.prompt_tokens if final_usage else 'N/A'}, " + f"output_tokens={final_usage.completion_tokens if final_usage else 'N/A'}, " + f"has_reasoning={has_reasoning}, " + f"finish_reason={finish_reason}") + except Exception as e: + logger.warning(f"Failed to update Langfuse: {e}") # return an [DONE] message at the end. yield self.stream_response_to_bytes() @@ -468,12 +511,17 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: except Exception as e: logger.error("Stream error for model %s: %s", chat_request.model, str(e)) # Update Langfuse with error - langfuse_context.update_current_observation( - level="ERROR", - status_message=f"Stream error: {str(e)}" - ) - if DEBUG: - logger.info(f"Langfuse: Updated observation with streaming error - error={str(e)[:100]}") + langfuse_client = _get_langfuse_client() + if langfuse_client: + try: + langfuse_client.update_current_generation( + level="ERROR", + status_message=f"Stream error: {str(e)}" + ) + if DEBUG: + logger.info(f"Langfuse: Updated observation with streaming error - error={str(e)[:100]}") + except Exception: + pass error_event = Error(error=ErrorMessage(message=str(e))) yield self.stream_response_to_bytes(error_event) From f05755667ff21c48e8e7d187c7d168ba827ef598 Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Fri, 14 Nov 2025 14:31:38 +1100 Subject: [PATCH 14/19] fix: downgrade to Langfuse 2.x for API compatibility - Changed langfuse requirement from >=2.0.0 to <3.0.0 - Now using langfuse 2.60.10 (same as pipelines container) - Reverted to langfuse_context API (v2) - Verified langfuse_context.update_current_observation() works - Tested: No import errors, container starts successfully --- src/api/models/bedrock.py | 155 +++++++++++++------------------------- src/requirements.txt | 2 +- 2 files changed, 54 insertions(+), 103 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 8ccb2df4..73ac7d27 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -14,7 +14,7 @@ import tiktoken from botocore.config import Config from fastapi import HTTPException -from langfuse import observe, get_client +from langfuse.decorators import langfuse_context, observe from starlette.concurrency import run_in_threadpool from api.models.base import BaseChatModel, BaseEmbeddingsModel @@ -54,20 +54,6 @@ logger = logging.getLogger(__name__) -# Initialize Langfuse client -_langfuse_client = None - -def _get_langfuse_client(): - """Get or create the Langfuse client singleton.""" - global _langfuse_client - if _langfuse_client is None: - try: - _langfuse_client = get_client() - except Exception as e: - logger.warning(f"Failed to initialize Langfuse client: {e}") - _langfuse_client = None - return _langfuse_client - config = Config( connect_timeout=60, # Connection timeout: 60 seconds read_timeout=900, # Read timeout: 15 minutes (suitable for long streaming responses) @@ -267,23 +253,18 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): } # Update Langfuse generation with input metadata - langfuse_client = _get_langfuse_client() - if langfuse_client: - try: - langfuse_client.update_current_generation( - input=messages, - model=model_id, - model_parameters=model_parameters, - metadata={ - 'system': args_clone.get('system', []), - 'toolConfig': args_clone.get('toolConfig', {}), - 'stream': stream - } - ) - if DEBUG: - logger.info(f"Langfuse: Updated observation with input - model={model_id}, stream={stream}, messages_count={len(messages)}") - except Exception as e: - logger.warning(f"Failed to update Langfuse: {e}") + langfuse_context.update_current_observation( + input=messages, + model=model_id, + model_parameters=model_parameters, + metadata={ + 'system': args_clone.get('system', []), + 'toolConfig': args_clone.get('toolConfig', {}), + 'stream': stream + } + ) + if DEBUG: + logger.info(f"Langfuse: Updated observation with input - model={model_id}, stream={stream}, messages_count={len(messages)}") try: if stream: @@ -321,61 +302,41 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): metadata["reasoning_content"] = reasoning_text metadata["reasoning_tokens_estimate"] = len(reasoning_text) // 4 - langfuse_client = _get_langfuse_client() - if langfuse_client: - try: - langfuse_client.update_current_generation( - output=output_message, - usage={ - "input": usage.get("inputTokens", 0), - "output": usage.get("outputTokens", 0), - "total": usage.get("totalTokens", 0) - }, - metadata=metadata - ) - if DEBUG: - logger.info(f"Langfuse: Updated observation with output - " - f"input_tokens={usage.get('inputTokens', 0)}, " - f"output_tokens={usage.get('outputTokens', 0)}, " - f"has_reasoning={has_reasoning}, " - f"stop_reason={response.get('stopReason')}") - except Exception as e: - logger.warning(f"Failed to update Langfuse: {e}") + langfuse_context.update_current_observation( + output=output_message, + usage={ + "input": usage.get("inputTokens", 0), + "output": usage.get("outputTokens", 0), + "total": usage.get("totalTokens", 0) + }, + metadata=metadata + ) + if DEBUG: + logger.info(f"Langfuse: Updated observation with output - " + f"input_tokens={usage.get('inputTokens', 0)}, " + f"output_tokens={usage.get('outputTokens', 0)}, " + f"has_reasoning={has_reasoning}, " + f"stop_reason={response.get('stopReason')}") except bedrock_runtime.exceptions.ValidationException as e: error_message = f"Bedrock validation error for model {chat_request.model}: {str(e)}" logger.error(error_message) - langfuse_client = _get_langfuse_client() - if langfuse_client: - try: - langfuse_client.update_current_generation(level="ERROR", status_message=error_message) - if DEBUG: - logger.info(f"Langfuse: Updated observation with ValidationException error") - except Exception: - pass + langfuse_context.update_current_observation(level="ERROR", status_message=error_message) + if DEBUG: + logger.info("Langfuse: Updated observation with ValidationException error") raise HTTPException(status_code=400, detail=str(e)) except bedrock_runtime.exceptions.ThrottlingException as e: error_message = f"Bedrock throttling for model {chat_request.model}: {str(e)}" logger.warning(error_message) - langfuse_client = _get_langfuse_client() - if langfuse_client: - try: - langfuse_client.update_current_generation(level="WARNING", status_message=error_message) - if DEBUG: - logger.info(f"Langfuse: Updated observation with ThrottlingException warning") - except Exception: - pass + langfuse_context.update_current_observation(level="WARNING", status_message=error_message) + if DEBUG: + logger.info("Langfuse: Updated observation with ThrottlingException warning") raise HTTPException(status_code=429, detail=str(e)) except Exception as e: error_message = f"Bedrock invocation failed for model {chat_request.model}: {str(e)}" logger.error(error_message) - langfuse_client = _get_langfuse_client() - if langfuse_client: - try: - langfuse_client.update_current_generation(level="ERROR", status_message=error_message) - if DEBUG: - logger.info(f"Langfuse: Updated observation with generic Exception error") - except Exception: - pass + langfuse_context.update_current_observation(level="ERROR", status_message=error_message) + if DEBUG: + logger.info("Langfuse: Updated observation with generic Exception error") raise HTTPException(status_code=500, detail=str(e)) return response @@ -486,21 +447,16 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: if metadata: update_params["metadata"] = metadata - langfuse_client = _get_langfuse_client() - if langfuse_client: - try: - langfuse_client.update_current_generation(**update_params) - if DEBUG: - output_length = len(accumulated_output) - logger.info(f"Langfuse: Updated observation with streaming output - " - f"chunks_count={output_length}, " - f"output_chars={len(final_output) if accumulated_output else 0}, " - f"input_tokens={final_usage.prompt_tokens if final_usage else 'N/A'}, " - f"output_tokens={final_usage.completion_tokens if final_usage else 'N/A'}, " - f"has_reasoning={has_reasoning}, " - f"finish_reason={finish_reason}") - except Exception as e: - logger.warning(f"Failed to update Langfuse: {e}") + langfuse_context.update_current_observation(**update_params) + if DEBUG: + output_length = len(accumulated_output) + logger.info(f"Langfuse: Updated observation with streaming output - " + f"chunks_count={output_length}, " + f"output_chars={len(final_output) if accumulated_output else 0}, " + f"input_tokens={final_usage.prompt_tokens if final_usage else 'N/A'}, " + f"output_tokens={final_usage.completion_tokens if final_usage else 'N/A'}, " + f"has_reasoning={has_reasoning}, " + f"finish_reason={finish_reason}") # return an [DONE] message at the end. yield self.stream_response_to_bytes() @@ -511,17 +467,12 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: except Exception as e: logger.error("Stream error for model %s: %s", chat_request.model, str(e)) # Update Langfuse with error - langfuse_client = _get_langfuse_client() - if langfuse_client: - try: - langfuse_client.update_current_generation( - level="ERROR", - status_message=f"Stream error: {str(e)}" - ) - if DEBUG: - logger.info(f"Langfuse: Updated observation with streaming error - error={str(e)[:100]}") - except Exception: - pass + langfuse_context.update_current_observation( + level="ERROR", + status_message=f"Stream error: {str(e)}" + ) + if DEBUG: + logger.info(f"Langfuse: Updated observation with streaming error - error={str(e)[:100]}") error_event = Error(error=ErrorMessage(message=str(e))) yield self.stream_response_to_bytes(error_event) diff --git a/src/requirements.txt b/src/requirements.txt index 1f8974b9..09c2d996 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -7,4 +7,4 @@ requests==2.32.4 numpy==2.2.5 boto3==1.40.4 botocore==1.40.4 -langfuse>=2.0.0 +langfuse<3.0.0 From 2afb11f28ceb2176a595d9060c7393081b125e9b Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Fri, 14 Nov 2025 15:09:27 +1100 Subject: [PATCH 15/19] refactor: explicitly initialize langfuse client --- src/api/models/bedrock.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 73ac7d27..1f7024f0 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -14,6 +14,7 @@ import tiktoken from botocore.config import Config from fastapi import HTTPException +from langfuse import Langfuse from langfuse.decorators import langfuse_context, observe from starlette.concurrency import run_in_threadpool @@ -54,6 +55,18 @@ logger = logging.getLogger(__name__) +# Explicitly initialize Langfuse client for @observe decorator +# This ensures the consumer and auth check happen at module load time +try: + _langfuse = Langfuse( + debug=DEBUG + ) + if DEBUG: + logger.info("Langfuse client initialized successfully") +except Exception as e: + logger.warning(f"Failed to initialize Langfuse client: {e}") + _langfuse = None + config = Config( connect_timeout=60, # Connection timeout: 60 seconds read_timeout=900, # Read timeout: 15 minutes (suitable for long streaming responses) From c78cee3dad880e29c5c34d428400157c1a9ea040 Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Fri, 14 Nov 2025 15:51:14 +1100 Subject: [PATCH 16/19] fix: read env variables --- src/api/models/bedrock.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 1f7024f0..f6247cea 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -59,6 +59,9 @@ # This ensures the consumer and auth check happen at module load time try: _langfuse = Langfuse( + public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"), + secret_key=os.environ.get("LANGFUSE_SECRET_KEY"), + host=os.environ.get("LANGFUSE_HOST"), debug=DEBUG ) if DEBUG: From 55ae73e23cb82663c31d9bc3328ef5030e263069 Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Wed, 19 Nov 2025 23:00:37 +1100 Subject: [PATCH 17/19] fix: add trace-level metadata for all chat messages --- src/api/models/bedrock.py | 22 ++++++++-- src/api/routers/chat.py | 87 +++++++++++++++++++++++++++++++++++++-- src/api/schema.py | 2 +- 3 files changed, 103 insertions(+), 8 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index f6247cea..adbf1abd 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -438,7 +438,7 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: # All other chunks will also include a usage field, but with a null value. yield self.stream_response_to_bytes(stream_response) - # Update Langfuse with final streaming metadata + # Update Langfuse with final streaming metadata (both observation and trace) if final_usage or accumulated_output: update_params = {} if accumulated_output: @@ -463,10 +463,22 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: if metadata: update_params["metadata"] = metadata + # Update the child observation (Bedrock Converse) langfuse_context.update_current_observation(**update_params) + + # Also update the parent trace (chat_completion) with final output + trace_output = { + "message": { + "role": "assistant", + "content": final_output if accumulated_output else None, + }, + "finish_reason": finish_reason, + } + langfuse_context.update_current_trace(output=trace_output) + if DEBUG: output_length = len(accumulated_output) - logger.info(f"Langfuse: Updated observation with streaming output - " + logger.info(f"Langfuse: Updated observation and trace with streaming output - " f"chunks_count={output_length}, " f"output_chars={len(final_output) if accumulated_output else 0}, " f"input_tokens={final_usage.prompt_tokens if final_usage else 'N/A'}, " @@ -482,11 +494,15 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: raise except Exception as e: logger.error("Stream error for model %s: %s", chat_request.model, str(e)) - # Update Langfuse with error + # Update Langfuse with error (both observation and trace) langfuse_context.update_current_observation( level="ERROR", status_message=f"Stream error: {str(e)}" ) + langfuse_context.update_current_trace( + output={"error": str(e)}, + metadata={"error": True} + ) if DEBUG: logger.info(f"Langfuse: Updated observation with streaming error - error={str(e)[:100]}") error_event = Error(error=ErrorMessage(message=str(e))) diff --git a/src/api/routers/chat.py b/src/api/routers/chat.py index 530f75d6..4c23f20c 100644 --- a/src/api/routers/chat.py +++ b/src/api/routers/chat.py @@ -1,7 +1,8 @@ from typing import Annotated -from fastapi import APIRouter, Body, Depends +from fastapi import APIRouter, Body, Depends, Header, Request from fastapi.responses import StreamingResponse +from langfuse.decorators import langfuse_context, observe from api.auth import api_key_auth from api.models.bedrock import BedrockModel @@ -15,10 +16,58 @@ ) +def extract_langfuse_metadata(chat_request: ChatRequest, headers: dict) -> dict: + """Extract Langfuse tracing metadata from request body and headers. + + Metadata can be provided via: + 1. extra_body.langfuse_metadata dict in the request + 2. HTTP headers: X-Chat-Id, X-User-Id, X-Session-Id, X-Message-Id + 3. user field in the request (for user_id) + + Returns a dict with: user_id, session_id, chat_id, message_id, and any custom metadata + """ + metadata = {} + + # Extract from extra_body if present + if chat_request.extra_body and isinstance(chat_request.extra_body, dict): + langfuse_meta = chat_request.extra_body.get("langfuse_metadata", {}) + if isinstance(langfuse_meta, dict): + metadata.update(langfuse_meta) + + # Extract from headers + headers_lower = {k.lower(): v for k, v in headers.items()} + + # Map headers to metadata fields - support both standard and OpenWebUI-prefixed headers + header_mapping = { + "x-chat-id": "chat_id", + "x-openwebui-chat-id": "chat_id", # OpenWebUI sends this format + "x-user-id": "user_id", + "x-openwebui-user-id": "user_id", # OpenWebUI sends this format + "x-session-id": "session_id", + "x-openwebui-session-id": "session_id", # OpenWebUI sends this format + "x-message-id": "message_id", + "x-openwebui-message-id": "message_id", # OpenWebUI sends this format + } + + for header_key, meta_key in header_mapping.items(): + if header_key in headers_lower and headers_lower[header_key]: + # Don't override if already set (standard headers take precedence) + if meta_key not in metadata: + metadata[meta_key] = headers_lower[header_key] + + # Use the 'user' field from request as user_id if not already set + if "user_id" not in metadata and chat_request.user: + metadata["user_id"] = chat_request.user + + return metadata + + @router.post( "/completions", response_model=ChatResponse | ChatStreamResponse | Error, response_model_exclude_unset=True ) +@observe(as_type="generation", name="chat_completion") async def chat_completions( + request: Request, chat_request: Annotated[ ChatRequest, Body( @@ -34,12 +83,42 @@ async def chat_completions( ), ], ): - if chat_request.model.lower().startswith("gpt-"): - chat_request.model = DEFAULT_MODEL + # Extract metadata for Langfuse tracing + metadata = extract_langfuse_metadata(chat_request, dict(request.headers)) + + # Create trace name using chat_id if available + trace_name = f"chat:{metadata.get('chat_id', 'unknown')}" + + # Update trace with metadata, user_id, and session_id + langfuse_context.update_current_trace( + name=trace_name, + user_id=metadata.get("user_id"), + session_id=metadata.get("session_id"), + metadata=metadata, + input={ + "model": chat_request.model, + "messages": [msg.model_dump() for msg in chat_request.messages], + "temperature": chat_request.temperature, + "max_tokens": chat_request.max_tokens, + "tools": [tool.model_dump() for tool in chat_request.tools] if chat_request.tools else None, + } + ) # Exception will be raised if model not supported. model = BedrockModel() model.validate(chat_request) + if chat_request.stream: return StreamingResponse(content=model.chat_stream(chat_request), media_type="text/event-stream") - return await model.chat(chat_request) + + response = await model.chat(chat_request) + + # Update trace with output for non-streaming + langfuse_context.update_current_trace( + output={ + "message": response.choices[0].message.model_dump() if response.choices else None, + "finish_reason": response.choices[0].finish_reason if response.choices else None, + } + ) + + return response diff --git a/src/api/schema.py b/src/api/schema.py index 233e1139..f7c829fc 100644 --- a/src/api/schema.py +++ b/src/api/schema.py @@ -99,7 +99,7 @@ class ChatRequest(BaseModel): stream_options: StreamOptions | None = None temperature: float | None = Field(default=1.0, le=2.0, ge=0.0) top_p: float | None = Field(default=1.0, le=1.0, ge=0.0) - user: str | None = None # Not used + user: str | None = None max_tokens: int | None = 2048 max_completion_tokens: int | None = None reasoning_effort: Literal["low", "medium", "high"] | None = None From 1bf485976f0aba28edd5d1bc95cb7b229feaca04 Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Thu, 20 Nov 2025 11:09:34 +1100 Subject: [PATCH 18/19] fix: esure Bedrock Converse observation nests properly in chat trace --- src/api/models/bedrock.py | 262 ++++++++++++++++++-------------------- 1 file changed, 124 insertions(+), 138 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index adbf1abd..0f98a987 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -248,7 +248,6 @@ def validate(self, chat_request: ChatRequest): detail=error, ) - @observe(as_type="generation", name="Bedrock Converse") async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): """Common logic for invoke bedrock models""" if DEBUG: @@ -259,29 +258,6 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): if DEBUG: logger.info("Bedrock request: " + json.dumps(str(args))) - # Extract model metadata for Langfuse - args_clone = args.copy() - messages = args_clone.get('messages', []) - model_id = args_clone.get('modelId', 'unknown') - model_parameters = { - **args_clone.get('inferenceConfig', {}), - **args_clone.get('additionalModelRequestFields', {}) - } - - # Update Langfuse generation with input metadata - langfuse_context.update_current_observation( - input=messages, - model=model_id, - model_parameters=model_parameters, - metadata={ - 'system': args_clone.get('system', []), - 'toolConfig': args_clone.get('toolConfig', {}), - 'stream': stream - } - ) - if DEBUG: - logger.info(f"Langfuse: Updated observation with input - model={model_id}, stream={stream}, messages_count={len(messages)}") - try: if stream: # Run the blocking boto3 call in a thread pool @@ -291,93 +267,99 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): else: # Run the blocking boto3 call in a thread pool response = await run_in_threadpool(bedrock_runtime.converse, **args) - - # For non-streaming, extract response metadata immediately - if response and not stream: - output_message = response.get("output", {}).get("message", {}) - usage = response.get("usage", {}) - - # Build metadata - metadata = { - "stopReason": response.get("stopReason"), - "ResponseMetadata": response.get("ResponseMetadata", {}) - } - - # Check for reasoning content in response - has_reasoning = False - reasoning_text = "" - if output_message and "content" in output_message: - for content_block in output_message.get("content", []): - if "reasoningContent" in content_block: - has_reasoning = True - reasoning_text = content_block.get("reasoningContent", {}).get("reasoningText", {}).get("text", "") - break - - if has_reasoning and reasoning_text: - metadata["has_extended_thinking"] = True - metadata["reasoning_content"] = reasoning_text - metadata["reasoning_tokens_estimate"] = len(reasoning_text) // 4 - - langfuse_context.update_current_observation( - output=output_message, - usage={ - "input": usage.get("inputTokens", 0), - "output": usage.get("outputTokens", 0), - "total": usage.get("totalTokens", 0) - }, - metadata=metadata - ) - if DEBUG: - logger.info(f"Langfuse: Updated observation with output - " - f"input_tokens={usage.get('inputTokens', 0)}, " - f"output_tokens={usage.get('outputTokens', 0)}, " - f"has_reasoning={has_reasoning}, " - f"stop_reason={response.get('stopReason')}") except bedrock_runtime.exceptions.ValidationException as e: error_message = f"Bedrock validation error for model {chat_request.model}: {str(e)}" logger.error(error_message) - langfuse_context.update_current_observation(level="ERROR", status_message=error_message) - if DEBUG: - logger.info("Langfuse: Updated observation with ValidationException error") raise HTTPException(status_code=400, detail=str(e)) except bedrock_runtime.exceptions.ThrottlingException as e: error_message = f"Bedrock throttling for model {chat_request.model}: {str(e)}" logger.warning(error_message) - langfuse_context.update_current_observation(level="WARNING", status_message=error_message) - if DEBUG: - logger.info("Langfuse: Updated observation with ThrottlingException warning") raise HTTPException(status_code=429, detail=str(e)) except Exception as e: error_message = f"Bedrock invocation failed for model {chat_request.model}: {str(e)}" logger.error(error_message) - langfuse_context.update_current_observation(level="ERROR", status_message=error_message) - if DEBUG: - logger.info("Langfuse: Updated observation with generic Exception error") raise HTTPException(status_code=500, detail=str(e)) return response async def chat(self, chat_request: ChatRequest) -> ChatResponse: - """Default implementation for Chat API.""" - + """Default implementation for Chat API. + + Note: Works within the parent trace context created by @observe + decorator on chat_completions endpoint. Updates that trace context + with the response data. + """ message_id = self.generate_message_id() - response = await self._invoke_bedrock(chat_request) - - output_message = response["output"]["message"] - input_tokens = response["usage"]["inputTokens"] - output_tokens = response["usage"]["outputTokens"] - finish_reason = response["stopReason"] - - chat_response = self._create_response( - model=chat_request.model, - message_id=message_id, - content=output_message["content"], - finish_reason=finish_reason, - input_tokens=input_tokens, - output_tokens=output_tokens, - ) - if DEBUG: - logger.info("Proxy response :" + chat_response.model_dump_json()) - return chat_response + + try: + if DEBUG: + logger.info(f"Langfuse: Starting non-streaming request for model={chat_request.model}") + + response = await self._invoke_bedrock(chat_request) + + output_message = response["output"]["message"] + input_tokens = response["usage"]["inputTokens"] + output_tokens = response["usage"]["outputTokens"] + finish_reason = response["stopReason"] + + # Build metadata including usage info + trace_metadata = { + "model": chat_request.model, + "stream": False, + "stopReason": finish_reason, + "usage": { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens + }, + "ResponseMetadata": response.get("ResponseMetadata", {}) + } + + # Check for reasoning content in response + has_reasoning = False + reasoning_text = "" + if output_message and "content" in output_message: + for content_block in output_message.get("content", []): + if "reasoningContent" in content_block: + has_reasoning = True + reasoning_text = content_block.get("reasoningContent", {}).get("reasoningText", {}).get("text", "") + break + + if has_reasoning and reasoning_text: + trace_metadata["has_extended_thinking"] = True + trace_metadata["reasoning_content"] = reasoning_text + trace_metadata["reasoning_tokens_estimate"] = len(reasoning_text) // 4 + + # Update trace with metadata + langfuse_context.update_current_trace( + metadata=trace_metadata + ) + + if DEBUG: + logger.info(f"Langfuse: Non-streaming response - " + f"input_tokens={input_tokens}, " + f"output_tokens={output_tokens}, " + f"has_reasoning={has_reasoning}, " + f"stop_reason={finish_reason}") + + chat_response = self._create_response( + model=chat_request.model, + message_id=message_id, + content=output_message["content"], + finish_reason=finish_reason, + input_tokens=input_tokens, + output_tokens=output_tokens, + ) + if DEBUG: + logger.info("Proxy response :" + chat_response.model_dump_json()) + return chat_response + except HTTPException: + # Re-raise HTTPException as-is + raise + except Exception as e: + logger.error("Chat error for model %s: %s", chat_request.model, str(e)) + if DEBUG: + logger.info(f"Langfuse: Error in non-streaming - error={str(e)[:100]}") + raise async def _async_iterate(self, stream): """Helper method to convert sync iterator to async iterator""" @@ -386,10 +368,21 @@ async def _async_iterate(self, stream): yield chunk async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: - """Default implementation for Chat Stream API""" + """Default implementation for Chat Stream API + + Note: For streaming, we work within the parent trace context created by @observe + decorator on chat_completions endpoint. We update that trace context with + streaming data as it arrives. + """ try: if DEBUG: logger.info(f"Langfuse: Starting streaming request for model={chat_request.model}") + + # Parse request for metadata to log in parent trace + args = self._parse_request(chat_request) + messages = args.get('messages', []) + model_id = args.get('modelId', 'unknown') + response = await self._invoke_bedrock(chat_request, stream=True) message_id = self.generate_message_id() stream = response.get("stream") @@ -403,8 +396,8 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: has_reasoning = False async for chunk in self._async_iterate(stream): - args = {"model_id": chat_request.model, "message_id": message_id, "chunk": chunk} - stream_response = self._create_response_stream(**args) + args_chunk = {"model_id": chat_request.model, "message_id": message_id, "chunk": chunk} + stream_response = self._create_response_stream(**args_chunk) if not stream_response: continue @@ -438,49 +431,46 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: # All other chunks will also include a usage field, but with a null value. yield self.stream_response_to_bytes(stream_response) - # Update Langfuse with final streaming metadata (both observation and trace) + # Update Langfuse trace with final streaming output + # This updates the parent trace from chat_completions if final_usage or accumulated_output: - update_params = {} - if accumulated_output: - final_output = "".join(accumulated_output) - update_params["output"] = final_output - if final_usage: - update_params["usage"] = { - "input": final_usage.prompt_tokens, - "output": final_usage.completion_tokens, - "total": final_usage.total_tokens - } - # Build metadata - metadata = {} - if finish_reason: - metadata["finish_reason"] = finish_reason - if has_reasoning and accumulated_reasoning: - reasoning_text = "".join(accumulated_reasoning) - metadata["has_extended_thinking"] = True - metadata["reasoning_content"] = reasoning_text - # Estimate reasoning tokens (rough approximation: ~4 chars per token) - metadata["reasoning_tokens_estimate"] = len(reasoning_text) // 4 - if metadata: - update_params["metadata"] = metadata - - # Update the child observation (Bedrock Converse) - langfuse_context.update_current_observation(**update_params) - - # Also update the parent trace (chat_completion) with final output + final_output = "".join(accumulated_output) if accumulated_output else None trace_output = { "message": { "role": "assistant", - "content": final_output if accumulated_output else None, + "content": final_output, }, "finish_reason": finish_reason, } - langfuse_context.update_current_trace(output=trace_output) + + # Build metadata including usage info + trace_metadata = { + "model": model_id, + "stream": True, + } + if finish_reason: + trace_metadata["finish_reason"] = finish_reason + if final_usage: + trace_metadata["usage"] = { + "prompt_tokens": final_usage.prompt_tokens, + "completion_tokens": final_usage.completion_tokens, + "total_tokens": final_usage.total_tokens + } + if has_reasoning and accumulated_reasoning: + reasoning_text = "".join(accumulated_reasoning) + trace_metadata["has_extended_thinking"] = True + trace_metadata["reasoning_tokens_estimate"] = len(reasoning_text) // 4 + + langfuse_context.update_current_trace( + output=trace_output, + metadata=trace_metadata + ) if DEBUG: output_length = len(accumulated_output) - logger.info(f"Langfuse: Updated observation and trace with streaming output - " + logger.info(f"Langfuse: Updated trace with streaming output - " f"chunks_count={output_length}, " - f"output_chars={len(final_output) if accumulated_output else 0}, " + f"output_chars={len(final_output) if final_output else 0}, " f"input_tokens={final_usage.prompt_tokens if final_usage else 'N/A'}, " f"output_tokens={final_usage.completion_tokens if final_usage else 'N/A'}, " f"has_reasoning={has_reasoning}, " @@ -490,21 +480,17 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: yield self.stream_response_to_bytes() self.think_emitted = False # Cleanup except HTTPException: - # HTTPException already has Langfuse updated in _invoke_bedrock, re-raise it + # Re-raise HTTPException as-is raise except Exception as e: logger.error("Stream error for model %s: %s", chat_request.model, str(e)) - # Update Langfuse with error (both observation and trace) - langfuse_context.update_current_observation( - level="ERROR", - status_message=f"Stream error: {str(e)}" - ) + # Update Langfuse with error langfuse_context.update_current_trace( output={"error": str(e)}, - metadata={"error": True} + metadata={"error": True, "error_type": type(e).__name__} ) if DEBUG: - logger.info(f"Langfuse: Updated observation with streaming error - error={str(e)[:100]}") + logger.info(f"Langfuse: Updated trace with streaming error - error={str(e)[:100]}") error_event = Error(error=ErrorMessage(message=str(e))) yield self.stream_response_to_bytes(error_event) From ff97409ac96fc2e46ade24e34f08bcf6e97e7334 Mon Sep 17 00:00:00 2001 From: Jake Marsden Date: Thu, 20 Nov 2025 11:48:12 +1100 Subject: [PATCH 19/19] fix: pass user email for all chats --- src/api/routers/chat.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/api/routers/chat.py b/src/api/routers/chat.py index 4c23f20c..23071d4c 100644 --- a/src/api/routers/chat.py +++ b/src/api/routers/chat.py @@ -21,10 +21,13 @@ def extract_langfuse_metadata(chat_request: ChatRequest, headers: dict) -> dict: Metadata can be provided via: 1. extra_body.langfuse_metadata dict in the request - 2. HTTP headers: X-Chat-Id, X-User-Id, X-Session-Id, X-Message-Id - 3. user field in the request (for user_id) + 2. user field in the request (PRIORITIZED for user_id - ensures consistent email usage) + 3. HTTP headers: X-Chat-Id, X-User-Id, X-Session-Id, X-Message-Id Returns a dict with: user_id, session_id, chat_id, message_id, and any custom metadata + + Note: The 'user' field is prioritized for user_id to ensure email addresses + are consistently used across all messages instead of generated IDs from headers. """ metadata = {} @@ -34,6 +37,11 @@ def extract_langfuse_metadata(chat_request: ChatRequest, headers: dict) -> dict: if isinstance(langfuse_meta, dict): metadata.update(langfuse_meta) + # PRIORITY: Set user_id from the 'user' field FIRST + # This ensures we always use the email address when available + if chat_request.user: + metadata["user_id"] = chat_request.user + # Extract from headers headers_lower = {k.lower(): v for k, v in headers.items()} @@ -51,14 +59,11 @@ def extract_langfuse_metadata(chat_request: ChatRequest, headers: dict) -> dict: for header_key, meta_key in header_mapping.items(): if header_key in headers_lower and headers_lower[header_key]: - # Don't override if already set (standard headers take precedence) + # Don't override if already set + # (chat_request.user takes precedence for user_id) if meta_key not in metadata: metadata[meta_key] = headers_lower[header_key] - # Use the 'user' field from request as user_id if not already set - if "user_id" not in metadata and chat_request.user: - metadata["user_id"] = chat_request.user - return metadata