Skip to content

Commit c07292c

Browse files
committed
add initial tool and schema implementation
1 parent 302e774 commit c07292c

File tree

2 files changed

+199
-0
lines changed

2 files changed

+199
-0
lines changed

openhands/runtime/schema.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from typing import Any, TypeVar
2+
from pydantic import BaseModel, Field, ConfigDict, create_model
3+
4+
S = TypeVar("S", bound="Schema")
5+
6+
7+
def py_type(spec: dict[str, Any]) -> Any:
8+
"""Map JSON schema types to Python types."""
9+
t = spec.get("type")
10+
if t == "array":
11+
items = spec.get("items", {})
12+
inner = py_type(items) if isinstance(items, dict) else Any
13+
return list[inner] # type: ignore[index]
14+
if t == "object":
15+
return dict[str, Any]
16+
_map = {
17+
"string": str,
18+
"integer": int,
19+
"number": float,
20+
"boolean": bool,
21+
}
22+
if t in _map:
23+
return _map[t]
24+
return Any
25+
26+
27+
class Schema(BaseModel):
28+
"""Base schema for input action / output observation."""
29+
30+
model_config = ConfigDict(extra="forbid")
31+
32+
@classmethod
33+
def to_mcp_schema(cls) -> dict[str, Any]:
34+
"""Convert to JSON schema format compatible with MCP."""
35+
js = cls.model_json_schema()
36+
req = [n for n, f in cls.model_fields.items() if f.is_required()]
37+
return {
38+
"type": "object",
39+
"properties": js.get("properties", {}) or {},
40+
"required": req or [],
41+
}
42+
43+
@classmethod
44+
def from_mcp_schema(
45+
cls: type[S], model_name: str, schema: dict[str, Any]
46+
) -> type["S"]:
47+
"""Create a Schema subclass from an MCP/JSON Schema object."""
48+
assert isinstance(schema, dict), "Schema must be a dict"
49+
assert schema.get("type") == "object", "Only object schemas are supported"
50+
51+
props: dict[str, Any] = schema.get("properties", {}) or {}
52+
required = set(schema.get("required", []) or [])
53+
54+
fields: dict[str, tuple] = {}
55+
for fname, spec in props.items():
56+
tp = py_type(spec if isinstance(spec, dict) else {})
57+
default = ... if fname in required else None
58+
desc: str | None = (
59+
spec.get("description") if isinstance(spec, dict) else None
60+
)
61+
fields[fname] = (
62+
tp,
63+
Field(default=default, description=desc)
64+
if desc
65+
else Field(default=default),
66+
)
67+
return create_model(model_name, __base__=cls, **fields) # type: ignore[return-value]
68+
69+
70+
class ActionBase(Schema):
71+
"""Base schema for input action."""
72+
73+
pass
74+
75+
76+
class ObservationBase(Schema):
77+
"""Base schema for output observation."""
78+
79+
model_config = ConfigDict(extra="allow")

openhands/runtime/tool.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
from typing import Any, Callable
2+
from pydantic import BaseModel
3+
from .schema import ActionBase, ObservationBase, Schema
4+
5+
6+
class ToolAnnotations(BaseModel):
7+
title: str | None = None
8+
readOnlyHint: bool | None = None
9+
destructiveHint: bool | None = None
10+
idempotentHint: bool | None = None
11+
openWorldHint: bool | None = None
12+
13+
14+
class Tool:
15+
"""Tool that wraps an executor function with input/output validation and schema.
16+
17+
- Normalize input/output schemas (class or dict) into both model+schema.
18+
- Validate inputs before execute.
19+
- Coerce outputs only if an output model is defined; else return vanilla JSON.
20+
- Export MCP tool description.
21+
"""
22+
23+
def __init__(
24+
self,
25+
*,
26+
name: str,
27+
input_schema: type[ActionBase] | dict[str, Any],
28+
output_schema: type[ObservationBase] | dict[str, Any] | None = None,
29+
description: str | None = None,
30+
annotations: ToolAnnotations | None = None,
31+
_meta: dict[str, Any] | None = None,
32+
execute_fn: Callable[[ActionBase], ObservationBase] | None = None,
33+
):
34+
self.name = name
35+
self.description = description
36+
self.annotations = annotations
37+
self._meta = _meta
38+
self._set_input_schema(input_schema)
39+
self._set_output_schema(output_schema)
40+
41+
self.execute_fn = execute_fn
42+
43+
def _set_input_schema(
44+
self, input_schema: dict[str, Any] | type[ActionBase]
45+
) -> None:
46+
# ---- INPUT: class or dict -> model + schema
47+
self.action_type: type[ActionBase]
48+
self.input_schema: dict[str, Any]
49+
if isinstance(input_schema, type) and issubclass(input_schema, Schema):
50+
self.action_type = input_schema
51+
self.input_schema = input_schema.to_mcp_schema()
52+
elif isinstance(input_schema, dict):
53+
self.input_schema = input_schema
54+
self.action_type = ActionBase.from_mcp_schema(
55+
f"{self.name}Action", input_schema
56+
)
57+
else:
58+
raise TypeError(
59+
"input_schema must be ActionBase subclass or dict JSON schema"
60+
)
61+
62+
def _set_output_schema(
63+
self, output_schema: dict[str, Any] | type[ObservationBase] | None
64+
) -> None:
65+
# ---- OUTPUT: optional class or dict -> model + schema
66+
self.observation_type: type[ObservationBase] | None
67+
self.output_schema: dict[str, Any] | None
68+
if output_schema is None:
69+
self.observation_type = None
70+
self.output_schema = None
71+
elif isinstance(output_schema, type) and issubclass(output_schema, Schema):
72+
self.observation_type = output_schema
73+
self.output_schema = output_schema.to_mcp_schema()
74+
elif isinstance(output_schema, dict):
75+
self.output_schema = output_schema
76+
self.observation_type = ObservationBase.from_mcp_schema(
77+
f"{self.name}Observation", output_schema
78+
)
79+
else:
80+
raise TypeError(
81+
"output_schema must be ObservationBase subclass, dict, or None"
82+
)
83+
84+
def call(self, action: ActionBase) -> ObservationBase:
85+
if self.execute_fn is None:
86+
raise NotImplementedError(f"Tool '{self.name}' has no executor")
87+
88+
# Execute
89+
result = self.execute_fn(action)
90+
91+
# Coerce output only if we declared a model; else wrap in base ObservationBase
92+
if self.observation_type:
93+
if isinstance(result, self.observation_type):
94+
return result
95+
return self.observation_type.model_validate(result)
96+
else:
97+
# When no output schema is defined, wrap the result in ObservationBase
98+
if isinstance(result, ObservationBase):
99+
return result
100+
elif isinstance(result, BaseModel):
101+
return ObservationBase.model_validate(result.model_dump())
102+
elif isinstance(result, dict):
103+
return ObservationBase.model_validate(result)
104+
raise TypeError(
105+
"Output must be dict or BaseModel when no output schema is defined"
106+
)
107+
108+
def to_mcp_tool(self) -> dict[str, Any]:
109+
out = {
110+
"name": self.name,
111+
"description": self.description,
112+
"inputSchema": self.input_schema,
113+
}
114+
if self.annotations:
115+
out["annotations"] = self.annotations
116+
if self._meta is not None:
117+
out["_meta"] = self._meta
118+
if self.output_schema:
119+
out["outputSchema"] = self.output_schema
120+
return out

0 commit comments

Comments
 (0)