Skip to content

Commit 5d9d16a

Browse files
committed
make automated tools async all the way down, address comments, revamp profile system to be easier to use for agents
1 parent 397a144 commit 5d9d16a

File tree

7 files changed

+126
-2324
lines changed

7 files changed

+126
-2324
lines changed

docs/topics/fenic-mcp.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ session.catalog.drop_tool("users_by_name_regex", ignore_if_not_exists=True)
140140

141141
### Step 2b: Create dynamic tools with `@fenic_tool`
142142

143-
Dynamic tools let you expose arbitrary Python logic as an MCP tool. They are defined with the `@fenic_tool` decorator and must return a Fenic `DataFrame`. Annotate parameters with `typing_extensions.Annotated` to provide per-argument descriptions in the tool schema. The server automatically adds `limit` and `table_format` keyword-only parameters for limiting the size of result sets and output formatting.
143+
Dynamic tools let you expose arbitrary Python logic as an MCP tool. They are defined with the `@fenic_tool` decorator and must return a Fenic `DataFrame`. Annotate parameters with `typing_extensions.Annotated` to provide per-argument descriptions in the tool schema. The server automatically adds `limit` and `table_format` keyword-only parameters for limiting the size of result sets and output formatting -- if the tool handles its own limiting, set `client_limit_parameter` to `False` to disable this behavior. The wrapped function can be async (recommended) or synchronous.
144144

145145
```python
146146
from typing_extensions import Annotated
@@ -178,7 +178,7 @@ orders_total = orders.group_by("user_id").agg(
178178
max_result_limit=100,
179179
default_table_format="markdown",
180180
)
181-
def users_with_min_spend(
181+
async def users_with_min_spend(
182182
name_regex: Annotated[Optional[str], "Regex for user name (use (?i) for case-insensitive)"] = None,
183183
min_total: Annotated[float, "Minimum total order amount"],
184184
) -> DataFrame:

src/fenic/api/mcp/tool_generation.py

Lines changed: 73 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,28 @@
1515
import functools
1616
import hashlib
1717
import inspect
18-
import json
1918
import re
20-
from dataclasses import dataclass, asdict
21-
from typing import Callable, Dict, List, Literal, Optional, TypedDict, Union, Coroutine, Any
19+
from dataclasses import dataclass
20+
from inspect import iscoroutinefunction
21+
from typing import (
22+
Any,
23+
Callable,
24+
Coroutine,
25+
Dict,
26+
List,
27+
Literal,
28+
Optional,
29+
Union,
30+
)
2231

23-
from fastmcp.server.context import Context
2432
import polars as pl
33+
from fastmcp.server.context import Context
2534
from typing_extensions import Annotated
2635

2736
from fenic.api.dataframe.dataframe import DataFrame
2837
from fenic.api.functions import (
2938
avg,
3039
col,
31-
count,
3240
stddev,
3341
)
3442
from fenic.api.functions import max as max_
@@ -47,6 +55,8 @@
4755
StringType,
4856
)
4957

58+
PROFILE_MAX_SAMPLE_SIZE = 10_000
59+
5060

5161
@dataclass
5262
class DatasetSpec:
@@ -101,18 +111,28 @@ def fenic_tool(
101111
tool_name: str,
102112
tool_description: str,
103113
max_result_limit: Optional[int] = None,
114+
client_limit_parameter: bool = True,
104115
default_table_format: TableFormat = "markdown",
105116
read_only: bool = True,
106117
idempotent: bool = True,
107118
destructive: bool = False,
108119
open_world: bool = False,
109-
) -> Callable[[Callable[..., Coroutine[Any, Any, DataFrame]]], DynamicToolDefinition]:
120+
) -> Callable[[
121+
Union[
122+
Callable[..., Coroutine[Any, Any, DataFrame]],
123+
Callable[..., DataFrame]
124+
]], DynamicToolDefinition]:
110125
"""Decorator to bind a DataFrame to a user-authored tool function.
111126
127+
Can be added to a synchronous or asynchronous (recommended) tool function.
128+
Function based tools (dynamic tools) cannot be persisted to the catalog.
129+
See the (Fenic MCP documentation)[https://fenic.ai/docs/topics/fenic-mcp] for more details.
130+
112131
Args:
113132
tool_name: The name of the tool.
114133
tool_description: The description of the tool.
115-
max_result_limit: The maximum number of results to return.
134+
max_result_limit: The maximum number of results to return. If omitted, no limit will be enforced.
135+
client_limit_parameter: Whether to add a client-side limit parameter to the tool.
116136
default_table_format: The default table format to return.
117137
read_only: A hint to provide to the model that the tool does not modify its environment.
118138
idempotent: A hint to provide to the model that calling the tool multiple times with the same input will always return the same result (redundant if read_only is True).
@@ -136,10 +156,10 @@ def find_rust(
136156
137157
Example: Creating an open-world tool that reaches out to an external API. The open_world flag indicates to the model that the tool may interact with an "open world" of external entities
138158
@fenic_tool(tool_name="search_knowledge_base", tool_description="...", open_world=True)
139-
def search_knowledge_base(
159+
async def search_knowledge_base(
140160
query: Annotated[str, "Knowledge base search query"],
141161
) -> DataFrame:
142-
results = requests.get(...)
162+
results = await requests.get(...)
143163
return fc.create_dataframe(results)
144164
145165
Notes:
@@ -149,20 +169,26 @@ def search_knowledge_base(
149169
- The returned object is a DynamicTool ready for registration.
150170
- A `limit` parameter is automatically added to the function signature, which can be used to limit the number of rows returned up to the tool's `max_result_limit`.
151171
- A `table_format` parameter is automatically added to the function signature, which can be used to specify the format of the returned data (markdown, structured)
172+
- The `add_limit_parameter` flag can be used to control whether the client is allowed to specify a limit parameter.
152173
"""
153174

154-
def decorator(func: Callable[..., Coroutine[Any, Any, DataFrame]]) -> DynamicToolDefinition:
175+
def decorator(
176+
func: Union[Callable[..., Coroutine[Any, Any, DataFrame]], Callable[..., DataFrame]]) -> DynamicToolDefinition:
155177
_ensure_no_var_args(func, func_label=tool_name)
156178

157179
@functools.wraps(func)
158180
async def wrapper(*args, **kwargs) -> LogicalPlan:
159-
result_df = await func(*args, **kwargs)
181+
if iscoroutinefunction(func):
182+
result_df = await func(*args, **kwargs)
183+
else:
184+
result_df = await asyncio.to_thread(lambda: func(*args, **kwargs))
160185
return result_df._logical_plan
161186

162187
return DynamicToolDefinition(
163188
name=tool_name,
164189
description=tool_description,
165190
max_result_limit=max_result_limit,
191+
add_limit_parameter=client_limit_parameter,
166192
default_table_format=default_table_format,
167193
read_only=read_only,
168194
idempotent=idempotent,
@@ -296,7 +322,7 @@ async def search_summary(
296322
)
297323

298324

299-
def auto_generate_search_content_tool(
325+
def _auto_generate_search_content_tool(
300326
datasets: List[DatasetSpec],
301327
session: Session,
302328
tool_name: str,
@@ -464,13 +490,10 @@ async def analyze_func(
464490
"- For text search, prefer regular expressions using REGEXP_MATCHES().\n",
465491
"- Paging: use ORDER BY to define row order, then LIMIT and OFFSET for pages.\n",
466492
f"- Results are limited to {result_limit} rows, use LIMIT/OFFSET to paginate when receiving a result set of {result_limit} or more rows.\n",
467-
"Examples:\n", # nosec B608 - example text only
468-
f"- SELECT * FROM {{{example_name}}} WHERE REGEXP_MATCHES(message, '(?i)error|fail') LIMIT {result_limit}",
469-
# nosec B608 - example text only
470-
f"- SELECT dept, COUNT(*) AS n FROM {{{example_name}}} WHERE status = 'active' GROUP BY dept HAVING n > 10 ORDER BY n DESC LIMIT {result_limit}",
471-
# nosec B608 - example text only
472-
f"- Paging: page 2 of size {result_limit}\n SELECT * FROM {{{example_name}}} ORDER BY created_at DESC LIMIT {result_limit} OFFSET {result_limit}",
473-
# nosec B608 - example text only
493+
"Examples:\n",
494+
f"- SELECT * FROM {{{example_name}}} WHERE REGEXP_MATCHES(message, '(?i)error|fail') LIMIT {result_limit}", # nosec B608 - example text only
495+
f"- SELECT dept, COUNT(*) AS n FROM {{{example_name}}} WHERE status = 'active' GROUP BY dept HAVING n > 10 ORDER BY n DESC LIMIT {result_limit}", # nosec B608 - example text only
496+
f"- Paging: page 2 of size {result_limit}\n SELECT * FROM {{{example_name}}} ORDER BY created_at DESC LIMIT {result_limit} OFFSET {result_limit}", # nosec B608 - example text only
474497
]
475498
)
476499
enhanced_description = "\n".join(lines)
@@ -543,7 +566,7 @@ def _apply_paging(
543566

544567

545568
@dataclass
546-
class ProfileRow:
569+
class _ProfileRow:
547570
dataset_name: str
548571
column_name: str
549572
data_type: str
@@ -584,26 +607,9 @@ def _auto_generate_profile_tool(
584607
raise ValueError("Cannot create profile tool: no datasets provided.")
585608
tool_key = _sanitize_name(tool_name)
586609

587-
async def _materialize_dataset_description(df: DataFrame, dataset_name: str, view_name: str) -> None:
588-
profile_rows = await _compute_profile_rows(df, dataset_name, topk_distinct)
589-
pl_df = pl.DataFrame(profile_rows)
590-
plan = InMemorySource.from_session_state(pl_df, session._session_state)
591-
catalog = session._session_state.catalog
592-
catalog.drop_view(view_name, ignore_if_not_exists=True)
593-
catalog.create_view(view_name, plan)
594-
595-
async def _ensure_profile_view_for_dataset(spec: DatasetSpec, refresh: bool) -> LogicalPlan:
596-
schema_hash = _schema_fingerprint(spec.df)
597-
view_name = f"__fenic_profile__{tool_key}__{_sanitize_name(spec.table_name)}__{schema_hash}"
598-
catalog = session._session_state.catalog
599-
if refresh or not catalog.does_view_exist(view_name):
600-
await _materialize_dataset_description(spec.df, spec.table_name, view_name)
601-
return catalog.get_view_plan(view_name)
602-
603610
async def profile_func(
604611
df_name: Annotated[
605612
str | None, "Optional DataFrame name to return a single profile for. To return profiles for all datasets, omit this parameter."] = None,
606-
refresh: Annotated[bool, "Recompute and refresh cached profile view(s)"] = False,
607613
) -> LogicalPlan:
608614
# sometimes the models get...very confused, and pass the null string instead of `null` or omitting the field entirely
609615
if not df_name or df_name == "null":
@@ -614,13 +620,12 @@ async def profile_func(
614620
if spec is None:
615621
raise ValidationError(
616622
f"Unknown dataset '{df_name}'. Available: {', '.join(d.table_name for d in datasets)}")
617-
return await _ensure_profile_view_for_dataset(spec, refresh)
623+
return await _ensure_profile_view_for_dataset(session, tool_key, spec, topk_distinct)
618624

619625
# Multi-dataset: concatenate cached views (or compute & cache if missing)
620626
profile_df = None
621627
for spec in datasets:
622-
# Ensure view exists and read it, then convert to polars for concatenation
623-
plan = await _ensure_profile_view_for_dataset(spec, refresh)
628+
plan = await _ensure_profile_view_for_dataset(session, tool_key, spec, topk_distinct)
624629
df = DataFrame._from_logical_plan(plan, session_state=session._session_state)
625630
if not profile_df:
626631
profile_df = df
@@ -636,21 +641,42 @@ async def profile_func(
636641
max_result_limit=None,
637642
)
638643

644+
async def _ensure_profile_view_for_dataset(
645+
session: Session,
646+
tool_key: str,
647+
spec: DatasetSpec,
648+
topk_distinct: int,
649+
) -> LogicalPlan:
650+
schema_hash = _schema_fingerprint(spec.df)
651+
view_name = f"__fenic_profile__{tool_key}__{_sanitize_name(spec.table_name)}__{schema_hash}"
652+
catalog = session._session_state.catalog
653+
if not catalog.does_view_exist(view_name):
654+
profile_rows = await _compute_profile_rows(
655+
spec.df,
656+
spec.table_name,
657+
topk_distinct,
658+
)
659+
view_plan = InMemorySource.from_session_state(
660+
pl.DataFrame(profile_rows), session._session_state,
661+
)
662+
catalog.create_view(view_name, view_plan)
663+
return catalog.get_view_plan(view_name)
664+
639665
async def _compute_profile_rows(
640666
df: DataFrame,
641667
dataset_name: str,
642-
topk_distinct: int
643-
) -> List[ProfileRow]:
668+
topk_distinct: int,
669+
) -> List[_ProfileRow]:
644670
pl_df = df.to_polars()
645671
total_rows = pl_df.height
646-
sampled_df = pl_df.sample(10000)
647-
rows_list: List[ProfileRow] = []
672+
sampled_df = pl_df.sample(min(total_rows, PROFILE_MAX_SAMPLE_SIZE))
673+
rows_list: List[_ProfileRow] = []
648674
for field in df.schema.column_fields:
649675
col_name = field.name
650676
dtype_str = str(field.data_type)
651677
null_count = sampled_df.select(pl.col(col_name).is_null().sum()).item()
652678
non_null_count = sampled_df.height - null_count
653-
stats: ProfileRow = ProfileRow(
679+
stats = _ProfileRow(
654680
dataset_name=dataset_name,
655681
column_name=col_name,
656682
data_type=dtype_str,
@@ -815,8 +841,7 @@ def _auto_generate_core_tools(
815841
"Return dataset data profile: row_count and per-column stats for any or all of the datasets listed below.",
816842
"This call should be used as a follow up after calling the `Schema` tool."
817843
"Numeric stats: min/max/mean/std; Booleans: true/false counts; Strings: distinct_count and top values.",
818-
"Results are cached per tool name and schema fingerprint; pass refresh=true to recompute.",
819-
"Profiles statistics are calculated across a sample of the original dataset.",
844+
"Profiling statistics are calculated across a sample of the original dataset.",
820845
"Available Datasets:",
821846
group_desc,
822847
]),
@@ -845,7 +870,7 @@ def _auto_generate_core_tools(
845870
group_desc,
846871
]),
847872
)
848-
search_content_tool = auto_generate_search_content_tool(
873+
search_content_tool = _auto_generate_search_content_tool(
849874
datasets,
850875
session,
851876
tool_name=f"{tool_group_name} - Search Content",

src/fenic/core/mcp/_server.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -147,30 +147,24 @@ def _handle_result_set(
147147
original_result_count = len(pl_df)
148148
if effective_limit and original_result_count > effective_limit:
149149
pl_df = pl_df.limit(effective_limit)
150+
schema_fields = [{"name": name, "type": str(dtype)} for name, dtype in pl_df.schema.items()]
151+
rows_list = pl_df.to_dicts()
152+
returned_result_count = len(rows_list)
150153
if table_format == "structured":
151-
rows_list = pl_df.to_dicts()
152-
schema_fields = [{"name": name, "type": str(dtype)} for name, dtype in pl_df.schema.items()]
153154
result_set = MCPResultSet(
154155
table_schema=schema_fields,
155156
rows=rows_list,
156-
returned_result_count=len(rows_list),
157+
returned_result_count=returned_result_count,
157158
total_result_count=original_result_count,
158159
)
159160
else:
160-
with pl.Config(
161-
tbl_hide_dataframe_shape=True,
162-
tbl_cols=-1,
163-
tbl_rows=-1,
164-
tbl_width_chars=-1,
165-
fmt_str_lengths=25000 #TODO(bcallender): make this configurable
166-
):
167-
rows = repr(pl_df)
168-
result_set = MCPResultSet(
169-
table_schema=None,
170-
rows=rows,
171-
returned_result_count=len(pl_df),
172-
total_result_count=original_result_count,
173-
)
161+
rows = _render_markdown_preview(rows_list)
162+
result_set = MCPResultSet(
163+
table_schema=schema_fields,
164+
rows=rows,
165+
returned_result_count=returned_result_count,
166+
total_result_count=original_result_count,
167+
)
174168
return result_set
175169

176170
def _build_parameterized_tool(self, tool: ParameterizedToolDefinition):

src/fenic/core/mcp/types.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Exported Types related to Parameterized View/MCP Tool Generation."""
22
from __future__ import annotations
33

4-
from typing import Annotated, Callable, List, Optional, Union, Coroutine, Any
4+
from typing import Annotated, Any, Callable, Coroutine, List, Optional, Union
55

66
from pydantic import BaseModel, ConfigDict, model_validator
77
from pydantic.dataclasses import dataclass
@@ -80,16 +80,17 @@ class ParameterizedToolDefinition:
8080
class DynamicToolDefinition:
8181
"""A tool implemented as a regular Python function with explicit parameters.
8282
83-
The function must be a `Callable[..., LogicalPlan]`. Collection/formatting is handled by
83+
The function must be a `Callable[..., Coroutine[Any, Any, LogicalPlan]]`
84+
(a function defined with `async def`). Collection/formatting is handled by
8485
the MCP generator wrapper.
8586
"""
8687
name: str
8788
description: str
8889
max_result_limit: Optional[int]
8990
func: Callable[..., Coroutine[Any, Any, LogicalPlan]]
90-
add_limit_parameter: bool = True
91+
add_limit_parameter: bool = True
9192
default_table_format: TableFormat = "markdown"
92-
read_only: Annotated[bool, "A hint to provide to the model that the tool is read-only."] = True
93-
idempotent: Annotated[bool, "A hint to provide to the model that the tool is idempotent."] = True
94-
destructive: Annotated[bool, "A hint to provide to the model that the tool is destructive."] = False
95-
open_world: Annotated[bool, "A hint to provide to the model that the tool reaches out to external endpoints/knowledge bases."] = False
93+
read_only: Annotated[bool, "A hint to provide to the client that the tool is read-only."] = True
94+
idempotent: Annotated[bool, "A hint to provide to the client that the tool is idempotent."] = True
95+
destructive: Annotated[bool, "A hint to provide to the client that the tool is destructive."] = False
96+
open_world: Annotated[bool, "A hint to provide to the client that the tool reaches out to external endpoints/knowledge bases."] = False

0 commit comments

Comments
 (0)