-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add metadata to the Agent class. #3370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| HistoryProcessor, | ||
| ModelRequestNode, | ||
| UserPromptNode, | ||
| build_run_context, | ||
| capture_run_messages, | ||
| ) | ||
| from .._output import OutputToolset | ||
|
|
@@ -89,6 +90,8 @@ | |
| S = TypeVar('S') | ||
| NoneType = type(None) | ||
|
|
||
| AgentMetadataValue = str | dict[str, str] | Callable[[RunContext[AgentDepsT]], str | dict[str, str]] | ||
|
|
||
|
|
||
| @dataclasses.dataclass(init=False) | ||
| class Agent(AbstractAgent[AgentDepsT, OutputDataT]): | ||
|
|
@@ -130,6 +133,7 @@ class Agent(AbstractAgent[AgentDepsT, OutputDataT]): | |
| """Options to automatically instrument with OpenTelemetry.""" | ||
|
|
||
| _instrument_default: ClassVar[InstrumentationSettings | bool] = False | ||
| _metadata: AgentMetadataValue[AgentDepsT] | None = dataclasses.field(repr=False) | ||
|
|
||
| _deps_type: type[AgentDepsT] = dataclasses.field(repr=False) | ||
| _output_schema: _output.OutputSchema[OutputDataT] = dataclasses.field(repr=False) | ||
|
|
@@ -175,6 +179,7 @@ def __init__( | |
| defer_model_check: bool = False, | ||
| end_strategy: EndStrategy = 'early', | ||
| instrument: InstrumentationSettings | bool | None = None, | ||
| metadata: AgentMetadataValue[AgentDepsT] | None = None, | ||
| history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None, | ||
| event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, | ||
| ) -> None: ... | ||
|
|
@@ -201,6 +206,7 @@ def __init__( | |
| defer_model_check: bool = False, | ||
| end_strategy: EndStrategy = 'early', | ||
| instrument: InstrumentationSettings | bool | None = None, | ||
| metadata: AgentMetadataValue[AgentDepsT] | None = None, | ||
| history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None, | ||
| event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, | ||
| ) -> None: ... | ||
|
|
@@ -225,6 +231,7 @@ def __init__( | |
| defer_model_check: bool = False, | ||
| end_strategy: EndStrategy = 'early', | ||
| instrument: InstrumentationSettings | bool | None = None, | ||
| metadata: AgentMetadataValue[AgentDepsT] | None = None, | ||
| history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None, | ||
| event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, | ||
| **_deprecated_kwargs: Any, | ||
|
|
@@ -276,6 +283,10 @@ def __init__( | |
| [`Agent.instrument_all()`][pydantic_ai.Agent.instrument_all] | ||
| will be used, which defaults to False. | ||
| See the [Debugging and Monitoring guide](https://ai.pydantic.dev/logfire/) for more info. | ||
| metadata: Optional metadata to attach to telemetry for this agent. | ||
| Provide a string literal, a dict of string keys and values, or a callable returning one of those values | ||
| computed from the [`RunContext`][pydantic_ai.tools.RunContext] on each run. | ||
| Metadata is only recorded when instrumentation is enabled. | ||
| history_processors: Optional list of callables to process the message history before sending it to the model. | ||
| Each processor takes a list of messages and returns a modified list of messages. | ||
| Processors can be sync or async and are applied in sequence. | ||
|
|
@@ -292,6 +303,7 @@ def __init__( | |
|
|
||
| self._output_type = output_type | ||
| self.instrument = instrument | ||
| self._metadata = metadata | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about the sunder, and that it's not represented in repr. I went by feeling. |
||
| self._deps_type = deps_type | ||
|
|
||
| if mcp_servers := _deprecated_kwargs.pop('mcp_servers', None): | ||
|
|
@@ -349,6 +361,9 @@ def __init__( | |
| self._override_instructions: ContextVar[ | ||
| _utils.Option[list[str | _system_prompt.SystemPromptFunc[AgentDepsT]]] | ||
| ] = ContextVar('_override_instructions', default=None) | ||
| self._override_metadata: ContextVar[_utils.Option[AgentMetadataValue[AgentDepsT]]] = ContextVar( | ||
| '_override_metadata', default=None | ||
| ) | ||
|
|
||
| self._enter_lock = Lock() | ||
| self._entered_count = 0 | ||
|
|
@@ -645,6 +660,7 @@ async def get_instructions(run_context: RunContext[AgentDepsT]) -> str | None: | |
| }, | ||
| ) | ||
|
|
||
| run_metadata: str | dict[str, str] | None = None | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could technically set a non-callable metadata even if the run fails, and the ctx isn't available. What do you think? I opted for the simpler implementation. |
||
| try: | ||
| async with graph.iter( | ||
| inputs=user_prompt_node, | ||
|
|
@@ -656,8 +672,10 @@ async def get_instructions(run_context: RunContext[AgentDepsT]) -> str | None: | |
| async with toolset: | ||
| agent_run = AgentRun(graph_run) | ||
| yield agent_run | ||
| if (final_result := agent_run.result) is not None and run_span.is_recording(): | ||
| if instrumentation_settings and instrumentation_settings.include_content: | ||
| final_result = agent_run.result | ||
| if instrumentation_settings and run_span.is_recording(): | ||
| run_metadata = self._compute_agent_metadata(build_run_context(agent_run.ctx)) | ||
| if instrumentation_settings.include_content and final_result is not None: | ||
| run_span.set_attribute( | ||
| 'final_result', | ||
| ( | ||
|
|
@@ -671,18 +689,32 @@ async def get_instructions(run_context: RunContext[AgentDepsT]) -> str | None: | |
| if instrumentation_settings and run_span.is_recording(): | ||
| run_span.set_attributes( | ||
| self._run_span_end_attributes( | ||
| instrumentation_settings, usage, state.message_history, graph_deps.new_message_index | ||
| instrumentation_settings, | ||
| usage, | ||
| state.message_history, | ||
| graph_deps.new_message_index, | ||
| run_metadata, | ||
| ) | ||
| ) | ||
| finally: | ||
| run_span.end() | ||
|
|
||
| def _compute_agent_metadata(self, ctx: RunContext[AgentDepsT]) -> str | dict[str, str] | None: | ||
| metadata_override = self._override_metadata.get() | ||
| metadata_config = metadata_override.value if metadata_override is not None else self._metadata | ||
| if metadata_config is None: | ||
| return None | ||
|
|
||
| metadata = metadata_config(ctx) if callable(metadata_config) else metadata_config | ||
| return metadata | ||
|
|
||
| def _run_span_end_attributes( | ||
| self, | ||
| settings: InstrumentationSettings, | ||
| usage: _usage.RunUsage, | ||
| message_history: list[_messages.ModelMessage], | ||
| new_message_index: int, | ||
| metadata: str | dict[str, str] | None = None, | ||
| ): | ||
| if settings.version == 1: | ||
| attrs = { | ||
|
|
@@ -716,6 +748,12 @@ def _run_span_end_attributes( | |
| ): | ||
| attrs['pydantic_ai.variable_instructions'] = True | ||
|
|
||
| if metadata is not None: | ||
| if isinstance(metadata, dict): | ||
| attrs['logfire.agent.metadata'] = json.dumps(metadata) | ||
| else: | ||
| attrs['logfire.agent.metadata'] = metadata | ||
|
|
||
| return { | ||
| **usage.opentelemetry_attributes(), | ||
| **attrs, | ||
|
|
@@ -740,6 +778,7 @@ def override( | |
| toolsets: Sequence[AbstractToolset[AgentDepsT]] | _utils.Unset = _utils.UNSET, | ||
| tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | _utils.Unset = _utils.UNSET, | ||
| instructions: Instructions[AgentDepsT] | _utils.Unset = _utils.UNSET, | ||
| metadata: AgentMetadataValue[AgentDepsT] | _utils.Unset = _utils.UNSET, | ||
| ) -> Iterator[None]: | ||
| """Context manager to temporarily override agent name, dependencies, model, toolsets, tools, or instructions. | ||
|
|
||
|
|
@@ -753,6 +792,7 @@ def override( | |
| toolsets: The toolsets to use instead of the toolsets passed to the agent constructor and agent run. | ||
| tools: The tools to use instead of the tools registered with the agent. | ||
| instructions: The instructions to use instead of the instructions registered with the agent. | ||
| metadata: The metadata to use instead of the metadata passed to the agent constructor. | ||
| """ | ||
| if _utils.is_set(name): | ||
| name_token = self._override_name.set(_utils.Some(name)) | ||
|
|
@@ -785,6 +825,11 @@ def override( | |
| else: | ||
| instructions_token = None | ||
|
|
||
| if _utils.is_set(metadata): | ||
| metadata_token = self._override_metadata.set(_utils.Some(metadata)) | ||
| else: | ||
| metadata_token = None | ||
|
|
||
| try: | ||
| yield | ||
| finally: | ||
|
|
@@ -800,6 +845,8 @@ def override( | |
| self._override_tools.reset(tools_token) | ||
| if instructions_token is not None: | ||
| self._override_instructions.reset(instructions_token) | ||
| if metadata_token is not None: | ||
| self._override_metadata.reset(metadata_token) | ||
|
|
||
| @overload | ||
| def instructions( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opted to not introduce a generic
MetadataTto the Agent, but instead allow strings or dicts, that get serialized into JSON-strings for the OTel attribute.