Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions examples/agent_patterns/human_in_the_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Human-in-the-loop example with tool approval.

This example demonstrates how to:
1. Define tools that require approval before execution
2. Handle interruptions when tool approval is needed
3. Serialize/deserialize run state to continue execution later
4. Approve or reject tool calls based on user input
"""

import asyncio
import json

from agents import Agent, Runner, RunState, ToolApprovalItem, function_tool


@function_tool
async def get_weather(city: str) -> str:
"""Get the weather for a given city.

Args:
city: The city to get weather for.

Returns:
Weather information for the city.
"""
return f"The weather in {city} is sunny"


async def _needs_temperature_approval(_ctx, params, _call_id) -> bool:
"""Check if temperature tool needs approval."""
return "Oakland" in params.get("city", "")


@function_tool(
# Dynamic approval: only require approval for Oakland
needs_approval=_needs_temperature_approval
)
async def get_temperature(city: str) -> str:
"""Get the temperature for a given city.

Args:
city: The city to get temperature for.

Returns:
Temperature information for the city.
"""
return f"The temperature in {city} is 20° Celsius"


# Main agent with tool that requires approval
agent = Agent(
name="Weather Assistant",
instructions=(
"You are a helpful weather assistant. "
"Answer questions about weather and temperature using the available tools."
),
tools=[get_weather, get_temperature],
)


async def confirm(question: str) -> bool:
"""Prompt user for yes/no confirmation.

Args:
question: The question to ask.

Returns:
True if user confirms, False otherwise.
"""
# Note: In a real application, you would use proper async input
# For now, using synchronous input with run_in_executor
loop = asyncio.get_event_loop()
answer = await loop.run_in_executor(None, input, f"{question} (y/n): ")
normalized = answer.strip().lower()
return normalized in ("y", "yes")


async def main():
"""Run the human-in-the-loop example."""
result = await Runner.run(
agent,
"What is the weather and temperature in Oakland?",
)

has_interruptions = len(result.interruptions) > 0

while has_interruptions:
print("\n" + "=" * 80)
print("Run interrupted - tool approval required")
print("=" * 80)

# Storing state to file (demonstrating serialization)
state = result.to_state()
state_json = state.to_json()
with open("result.json", "w") as f:
json.dump(state_json, f, indent=2)

print("State saved to result.json")

# From here on you could run things on a different thread/process

# Reading state from file (demonstrating deserialization)
print("Loading state from result.json")
with open("result.json") as f:
stored_state_json = json.load(f)

state = await RunState.from_json(agent, stored_state_json)

# Process each interruption
for interruption in result.interruptions:
if not isinstance(interruption, ToolApprovalItem):
continue

print("\nTool call details:")
print(f" Agent: {interruption.agent.name}")
print(f" Tool: {interruption.raw_item.name}")
print(f" Arguments: {interruption.raw_item.arguments}")

confirmed = await confirm("\nDo you approve this tool call?")

if confirmed:
print(f"✓ Approved: {interruption.raw_item.name}")
state.approve(interruption)
else:
print(f"✗ Rejected: {interruption.raw_item.name}")
state.reject(interruption)

# Resume execution with the updated state
print("\nResuming agent execution...")
result = await Runner.run(agent, state)
has_interruptions = len(result.interruptions) > 0

print("\n" + "=" * 80)
print("Final Output:")
print("=" * 80)
print(result.final_output)


if __name__ == "__main__":
asyncio.run(main())
123 changes: 123 additions & 0 deletions examples/agent_patterns/human_in_the_loop_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Human-in-the-loop example with streaming.

This example demonstrates the human-in-the-loop (HITL) pattern with streaming.
The agent will pause execution when a tool requiring approval is called,
allowing you to approve or reject the tool call before continuing.

The streaming version provides real-time feedback as the agent processes
the request, then pauses for approval when needed.
"""

import asyncio

from agents import Agent, Runner, ToolApprovalItem, function_tool


async def _needs_temperature_approval(_ctx, params, _call_id) -> bool:
"""Check if temperature tool needs approval."""
return "Oakland" in params.get("city", "")


@function_tool(
# Dynamic approval: only require approval for Oakland
needs_approval=_needs_temperature_approval
)
async def get_temperature(city: str) -> str:
"""Get the temperature for a given city.

Args:
city: The city to get temperature for.

Returns:
Temperature information for the city.
"""
return f"The temperature in {city} is 20° Celsius"


@function_tool
async def get_weather(city: str) -> str:
"""Get the weather for a given city.

Args:
city: The city to get weather for.

Returns:
Weather information for the city.
"""
return f"The weather in {city} is sunny."


async def confirm(question: str) -> bool:
"""Prompt user for yes/no confirmation.

Args:
question: The question to ask.

Returns:
True if user confirms, False otherwise.
"""
loop = asyncio.get_event_loop()
answer = await loop.run_in_executor(None, input, f"{question} (y/n): ")
return answer.strip().lower() in ["y", "yes"]


async def main():
"""Run the human-in-the-loop example."""
main_agent = Agent(
name="Weather Assistant",
instructions=(
"You are a helpful weather assistant. "
"Answer questions about weather and temperature using the available tools."
),
tools=[get_temperature, get_weather],
)

# Run the agent with streaming
result = Runner.run_streamed(
main_agent,
"What is the weather and temperature in Oakland?",
)
async for _ in result.stream_events():
pass # Process streaming events silently or could print them

# Handle interruptions
while len(result.interruptions) > 0:
print("\n" + "=" * 80)
print("Human-in-the-loop: approval required for the following tool calls:")
print("=" * 80)

state = result.to_state()

for interruption in result.interruptions:
if not isinstance(interruption, ToolApprovalItem):
continue

print("\nTool call details:")
print(f" Agent: {interruption.agent.name}")
print(f" Tool: {interruption.raw_item.name}")
print(f" Arguments: {interruption.raw_item.arguments}")

confirmed = await confirm("\nDo you approve this tool call?")

if confirmed:
print(f"✓ Approved: {interruption.raw_item.name}")
state.approve(interruption)
else:
print(f"✗ Rejected: {interruption.raw_item.name}")
state.reject(interruption)

# Resume execution with streaming
print("\nResuming agent execution...")
result = Runner.run_streamed(main_agent, state)
async for _ in result.stream_events():
pass # Process streaming events silently or could print them

print("\n" + "=" * 80)
print("Final Output:")
print("=" * 80)
print(result.final_output)
print("\nDone!")


if __name__ == "__main__":
asyncio.run(main())
117 changes: 117 additions & 0 deletions examples/memory/memory_session_hitl_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
Example demonstrating SQLite in-memory session with human-in-the-loop (HITL) tool approval.

This example shows how to use SQLite in-memory session memory combined with
human-in-the-loop tool approval. The session maintains conversation history while
requiring approval for specific tool calls.
"""

import asyncio

from agents import Agent, Runner, SQLiteSession, function_tool


async def _needs_approval(_ctx, _params, _call_id) -> bool:
"""Always require approval for weather tool."""
return True


@function_tool(needs_approval=_needs_approval)
def get_weather(location: str) -> str:
"""Get weather for a location.

Args:
location: The location to get weather for

Returns:
Weather information as a string
"""
# Simulated weather data
weather_data = {
"san francisco": "Foggy, 58°F",
"oakland": "Sunny, 72°F",
"new york": "Rainy, 65°F",
}
# Check if any city name is in the provided location string
location_lower = location.lower()
for city, weather in weather_data.items():
if city in location_lower:
return weather
return f"Weather data not available for {location}"


async def prompt_yes_no(question: str) -> bool:
"""Prompt user for yes/no answer.

Args:
question: The question to ask

Returns:
True if user answered yes, False otherwise
"""
print(f"\n{question} (y/n): ", end="", flush=True)
loop = asyncio.get_event_loop()
answer = await loop.run_in_executor(None, input)
normalized = answer.strip().lower()
return normalized in ("y", "yes")


async def main():
# Create an agent with a tool that requires approval
agent = Agent(
name="HITL Assistant",
instructions="You help users with information. Always use available tools when appropriate. Keep responses concise.",
tools=[get_weather],
)

# Create an in-memory SQLite session instance that will persist across runs
session = SQLiteSession(":memory:")
session_id = session.session_id

print("=== Memory Session + HITL Example ===")
print(f"Session id: {session_id}")
print("Enter a message to chat with the agent. Submit an empty line to exit.")
print("The agent will ask for approval before using tools.\n")

while True:
# Get user input
print("You: ", end="", flush=True)
loop = asyncio.get_event_loop()
user_message = await loop.run_in_executor(None, input)

if not user_message.strip():
break

# Run the agent
result = await Runner.run(agent, user_message, session=session)

# Handle interruptions (tool approvals)
while result.interruptions:
# Get the run state
state = result.to_state()

for interruption in result.interruptions:
tool_name = interruption.raw_item.name # type: ignore[union-attr]
args = interruption.raw_item.arguments or "(no arguments)" # type: ignore[union-attr]

approved = await prompt_yes_no(
f"Agent {interruption.agent.name} wants to call '{tool_name}' with {args}. Approve?"
)

if approved:
state.approve(interruption)
print("Approved tool call.")
else:
state.reject(interruption)
print("Rejected tool call.")

# Resume the run with the updated state
result = await Runner.run(agent, state, session=session)

# Display the response
reply = result.final_output or "[No final output produced]"
print(f"Assistant: {reply}\n")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading