diff --git a/examples/demos/procurement_agent/.dockerignore b/examples/demos/procurement_agent/.dockerignore new file mode 100644 index 00000000..c4f7a8b4 --- /dev/null +++ b/examples/demos/procurement_agent/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store \ No newline at end of file diff --git a/examples/demos/procurement_agent/.gitignore b/examples/demos/procurement_agent/.gitignore new file mode 100644 index 00000000..127b65a2 --- /dev/null +++ b/examples/demos/procurement_agent/.gitignore @@ -0,0 +1,63 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +venv/ +env/ +ENV/ +.venv + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Database files +*.db +*.sqlite +*.sqlite3 + +# Environment variables +.env +.env.local + +# Logs +*.log + +# OS +.DS_Store +Thumbs.db + +# Jupyter +.ipynb_checkpoints/ +*.ipynb_checkpoints + +# Testing +.pytest_cache/ +.coverage +htmlcov/ + +# UV +.venv/ +uv.lock diff --git a/examples/demos/procurement_agent/Dockerfile b/examples/demos/procurement_agent/Dockerfile new file mode 100644 index 00000000..17dd0e68 --- /dev/null +++ b/examples/demos/procurement_agent/Dockerfile @@ -0,0 +1,48 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +# Install tctl (Temporal CLI) +RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy just the pyproject.toml file to optimize caching +COPY procurement_agent/pyproject.toml /app/procurement_agent/pyproject.toml + +WORKDIR /app/procurement_agent + +# Install the required Python packages using uv +RUN uv pip install --system . + +# Copy the project code +COPY procurement_agent/project /app/procurement_agent/project + +# Run the ACP server using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] \ No newline at end of file diff --git a/examples/demos/procurement_agent/README.md b/examples/demos/procurement_agent/README.md new file mode 100644 index 00000000..878c2ec3 --- /dev/null +++ b/examples/demos/procurement_agent/README.md @@ -0,0 +1,412 @@ +# Procurement Agent Demo + +A demonstration of long-running, autonomous AI agents using **Temporal** and **AgentEx**. This agent manages construction procurement workflows that can run for months, respond to external events, and escalate to humans when needed. + +## What This Demo Shows + +This demo illustrates a **procurement manager for building construction** that: + +- **Runs for months or years** - Temporal workflows enable truly persistent agents +- **Responds to external events** - Not just human input, but signals from the real world (shipments, inspections, etc.) +- **Escalates to humans when needed** - Waits indefinitely for human decisions on critical issues +- **Learns from experience** - Remembers past human decisions and applies them to similar situations +- **Manages complex state** - Uses a database to track construction schedules and procurement items + +### Key Concepts + +**Long-Running Workflows**: Thanks to Temporal, the agent can live for months, surviving restarts and failures while maintaining full context. + +**External Event Integration**: The agent receives real-world signals (not just user messages) via Temporal signals and takes autonomous actions. + +**Human-in-the-Loop**: The agent can pause execution indefinitely (up to 24 hours) while waiting for human approval on critical decisions. + +**Learning System**: When a human makes a decision, the agent extracts learnings and applies them to future similar situations. + +**State Management**: Uses SQLite to persist construction schedules and procurement item status, providing queryable visibility into current operations without parsing conversation history. + +**Automatic Summarization**: When conversation history exceeds token limits (~40k tokens), the agent automatically summarizes older messages while preserving recent context, enabling indefinite conversation length. + +## Example Workflow + +Here's what happens when items move through the procurement pipeline: + +1. **Submittal Approved** → Agent issues purchase order and creates tracking record +2. **Shipment Departed Factory** → Agent ingests ETA and checks for schedule conflicts +3. **Shipment Arrived Site** → Agent notifies team and schedules quality inspection +4. **Inspection Failed** → Agent escalates to human with recommended action +5. **Human Decision** → Agent learns from the decision for next time + +## Running the Demo + +### Prerequisites + +You'll need three terminals running: + +1. **AgentEx Backend** (database, Temporal server, etc.) +2. **AgentEx UI** (web interface at localhost:3000) +3. **Procurement Agent** (this demo) + +### Step 1: Start AgentEx Backend + +From the `scale-agentex` repository: + +```bash +make dev +``` + +This starts all required services (Postgres, Temporal, Redis, etc.) via Docker Compose. Verify everything is healthy: + +```bash +# Optional: Use lazydocker for a better view +lzd +``` + +You should see Temporal UI at: http://localhost:8080 + +### Step 2: Start AgentEx Web UI + +From the `scale-agentex-web` repository: + +```bash +make dev +``` + +The UI will be available at: http://localhost:3000 + +### Step 3: Run the Procurement Agent + +From this directory (`examples/demos/procurement_agent`): + +```bash +# Install dependencies +uv sync + +# Run the agent +export ENVIRONMENT=development && uv run agentex agents run --manifest manifest.yaml +``` + +The agent will start and register with the AgentEx backend on port 8000. + +### Step 4: Create a Task + +Go to http://localhost:3000 and: + +1. Create a new task for the `procurement-agent` +2. Send a message like "Hello" to initialize the workflow +3. Note the **Workflow ID** from the Temporal UI at http://localhost:8080 + +### Step 5: Send Test Events + +Now simulate real-world procurement events: + +```bash +# Navigate to the scripts directory +cd project/scripts + +# Send events (you'll be prompted for the workflow ID) +uv run send_test_events.py + +# Or provide the workflow ID directly +uv run send_test_events.py +``` + +The script sends a series of events simulating the procurement lifecycle for multiple items: +- Steel Beams (passes inspection) +- HVAC Units (fails inspection - agent escalates) +- Windows (passes inspection) +- Flooring Materials (passes inspection) +- Electrical Panels (fails inspection - agent applies learnings) + +### Step 6: Observe the Agent + +Watch the agent in action: + +1. **AgentEx UI** (http://localhost:3000) - See agent responses and decisions +2. **Temporal UI** (http://localhost:8080) - View workflow execution, signals, and state +3. **Terminal** - Watch agent logs for detailed operation info + +When an inspection fails, the agent will: +- Analyze the situation +- Recommend an action +- Wait for your response in the AgentEx UI +- Learn from your decision for future similar situations + +## Project Structure + +``` +procurement_agent/ +├── project/ +│ ├── acp.py # ACP server & event handlers +│ ├── workflow.py # Main Temporal workflow logic +│ ├── run_worker.py # Temporal worker setup +│ ├── agents/ +│ │ ├── procurement_agent.py # Main AI agent with procurement tools +│ │ ├── extract_learnings_agent.py # Extracts learnings from human decisions +│ │ └── summarization_agent.py # Summarizes conversation history +│ ├── activities/ +│ │ └── activities.py # Temporal activities (POs, inspections, schedules) +│ ├── data/ +│ │ ├── database.py # SQLite operations +│ │ └── procurement.db # Persistent storage (auto-created) +│ ├── models/ +│ │ └── events.py # Event type definitions (Pydantic models) +│ ├── scripts/ +│ │ └── send_test_events.py # Event simulation script +│ └── utils/ +│ ├── learning_extraction.py # Utilities for extracting context from conversations +│ └── summarization.py # Token counting and summarization logic +├── manifest.yaml # Agent configuration +├── Dockerfile # Container definition +└── pyproject.toml # Dependencies (uv) +``` + +## How It Works + +### 1. Event-Driven Architecture + +The agent receives events via Temporal signals in `workflow.py`: + +```python +@workflow.signal +async def send_event(self, event: str) -> None: + # Validate and queue the event + await self.event_queue.put(event) +``` + +Events are validated against Pydantic models and processed by the AI agent. + +### 2. Human-in-the-Loop Pattern + +Critical decisions require human approval via the `wait_for_human` tool in `procurement_agent.py`: + +```python +@function_tool +async def wait_for_human(recommended_action: str) -> str: + """ + Pause execution until human provides input. + Waits up to 24 hours for response. + """ + await workflow.wait_condition( + lambda: not workflow_instance.human_queue.empty(), + timeout=timedelta(hours=24), + ) + # ... return human response +``` + +The workflow continues only after receiving human input through the AgentEx UI. + +### 3. State Management + +Instead of cramming everything into the LLM context window, the agent uses SQLite to manage: + +- **Master construction schedule** (delivery dates, buffer days, requirements) +- **Procurement items** (status, ETAs, purchase orders, inspection results) + +The database is accessed through Temporal activities with proper error handling and retry policies. + +### 4. Learning System + +When humans make decisions, the agent extracts learnings in `extract_learnings_agent.py`: + +```python +# After human input, extract the learning +extraction_result = await Runner.run(extract_agent, new_context, hooks=hooks) +learning = extraction_result.final_output + +# Store in workflow state for future reference +self.human_input_learnings.append(learning) +``` + +These learnings are passed into the agent's system prompt on subsequent runs. + +### 5. Automatic Summarization + +For long-running workflows, conversation history can grow unbounded. The agent automatically manages context using intelligent summarization: + +```python +# After each turn, check if summarization is needed +if should_summarize(self._state.input_list): + # Find messages to summarize (preserves last 10 turns, starts after previous summary) + messages_to_summarize, start_index, end_index = get_messages_to_summarize( + self._state.input_list, + last_summary_index + ) + + # Generate summary with dedicated agent + summary_agent = new_summarization_agent() + summary_result = await Runner.run(summary_agent, messages_to_summarize, hooks=hooks) + + # Replace summarized portion with compact summary + self._state.input_list = apply_summary_to_input_list(...) +``` + +Key features: +- **Token threshold**: Triggers at ~40k tokens to stay within model limits +- **Preserves recent context**: Always keeps last 10 user turns in full detail +- **Never re-summarizes**: Starts after the most recent summary to avoid information loss +- **Dedicated summarization agent**: GPT-4o agent focused on extracting key procurement events, decisions, and current state + +This enables workflows to run indefinitely without hitting context limits. + +### 6. Error Handling & Retries + +The workflow uses Temporal's retry policies for resilient execution: + +```python +retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, # Exponential backoff + maximum_interval=timedelta(seconds=120), + maximum_attempts=5, + non_retryable_error_types=[ + "DataCorruptionError", + "ScheduleNotFoundError", + ] +) +``` + +Activities automatically retry on transient failures but fail fast on data corruption. + +## Key Features + +### Durability +- Workflows survive process restarts, crashes, and deployments +- All state is persisted in Temporal and SQLite +- No context is lost even after months of runtime + +### External Event Processing +- Responds to events from external systems (ERP, logistics, QA) +- Validates and processes events asynchronously +- Multiple event types supported (approvals, shipments, inspections) + +### Human Escalation +- Automatically escalates critical issues (schedule delays, inspection failures) +- Provides recommended actions to humans +- Waits indefinitely (up to 24 hours) for human response +- Continues workflow after receiving guidance + +### Learning & Adaptation +- Extracts patterns from human decisions +- Applies learned rules to similar future situations +- Becomes more autonomous over time +- Human maintains oversight and final authority + +### Observability +- Full workflow history in Temporal UI +- Real-time agent responses in AgentEx UI +- Detailed logging for debugging +- Database audit trail for all changes + +## Customizing the Demo + +### Modify the Construction Schedule + +Edit the default schedule in `project/data/database.py`: + +```python +DEFAULT_SCHEDULE = { + "project": { + "name": "Small Office Renovation", + "start_date": "2026-02-01", + "end_date": "2026-05-31" + }, + "deliveries": [ + { + "item": "Steel Beams", + "required_by": "2026-02-15", + "buffer_days": 5 + }, + # ... add more items + ] +} +``` + +### Add New Event Types + +1. Define the event in `project/models/events.py` +2. Update event validation in `workflow.py` +3. Teach the agent how to handle it in `procurement_agent.py` +4. Add test events in `project/scripts/send_test_events.py` + +### Change Agent Behavior + +Modify the agent's instructions in `project/agents/procurement_agent.py`: + +```python +def new_procurement_agent(master_construction_schedule: str, human_input_learnings: list) -> Agent: + instructions = f""" + You are a procurement agent for a commercial building construction project. + + [Your custom instructions here...] + """ + # ... +``` + +### Add New Tools + +Create new activities in `project/activities/activities.py` and register them as tools: + +```python +@activity.defn(name="my_custom_activity") +async def my_custom_activity(param: str) -> str: + # ... your logic + return result + +# Register in the agent +tools=[ + openai_agents.workflow.activity_as_tool( + my_custom_activity, + start_to_close_timeout=timedelta(minutes=10) + ), + # ... other tools +] +``` + +## Troubleshooting + +**Agent not appearing in UI** +- Verify agent is running on port 8000: `lsof -i :8000` +- Check `ENVIRONMENT=development` is set +- Review agent logs for errors + +**Events not being received** +- Confirm workflow ID is correct (check Temporal UI) +- Verify Temporal server is running: `docker ps | grep temporal` +- Check that send_test_events.py is using the right workflow ID + +**Human escalation timeout** +- The agent waits 24 hours for human input before timing out +- Respond in the AgentEx UI task thread +- Check that your message is being sent to the correct task + +**Database errors** +- The database is automatically created at `project/data/procurement.db` +- Delete the file to reset: `rm project/data/procurement.db` +- The agent will recreate it on next run + +**Import errors** +- Make sure dependencies are installed: `uv sync` +- Verify you're running from the correct directory +- Check Python version is 3.12+ + +## What's Next? + +This demo shows the foundation for autonomous, long-running agents. Potential applications include: + +- **Supply chain management** - Track orders, shipments, and inventory across months +- **Compliance workflows** - Monitor regulatory requirements and schedule audits +- **Customer success** - Proactive outreach based on usage patterns and lifecycle stage +- **Infrastructure management** - React to alerts, coordinate maintenance, escalate outages +- **Financial processes** - Invoice approval workflows, budget tracking, expense management + +The key insight: **AI agents don't just answer questions—they can run real-world processes autonomously over time.** + +## Learn More + +- [AgentEx Documentation](https://agentex.sgp.scale.com/docs/) +- [Temporal Documentation](https://docs.temporal.io/) +- [OpenAI Agents SDK](https://github.com/openai/agents-sdk) + +--- + +**Questions or issues?** Open an issue on the [scale-agentex GitHub repository](https://github.com/scaleapi/scale-agentex). diff --git a/examples/demos/procurement_agent/dev.ipynb b/examples/demos/procurement_agent/dev.ipynb new file mode 100644 index 00000000..53b70d15 --- /dev/null +++ b/examples/demos/procurement_agent/dev.ipynb @@ -0,0 +1,126 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "36834357", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1c309d6", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"procurement-agent\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f6e6ef0", + "metadata": {}, + "outputs": [], + "source": [ + "# (REQUIRED) Create a new task. For Async agents, you must create a task for messages to be associated with.\n", + "import uuid\n", + "\n", + "rpc_response = client.agents.create_task(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"name\": f\"{str(uuid.uuid4())[:8]}-task\",\n", + " \"params\": {}\n", + " }\n", + ")\n", + "\n", + "task = rpc_response.result\n", + "print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b03b0d37", + "metadata": {}, + "outputs": [], + "source": [ + "# Send an event to the agent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_event(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"task_id\": task.id,\n", + " }\n", + ")\n", + "\n", + "event = rpc_response.result\n", + "print(event)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6927cc0", + "metadata": {}, + "outputs": [], + "source": [ + "# Subscribe to the async task messages produced by the agent\n", + "from agentex.lib.utils.dev_tools import subscribe_to_async_task_messages\n", + "\n", + "task_messages = subscribe_to_async_task_messages(\n", + " client=client,\n", + " task=task, \n", + " only_after_timestamp=event.created_at, \n", + " print_messages=True,\n", + " rich_print=True,\n", + " timeout=5,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4864e354", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file diff --git a/examples/demos/procurement_agent/environments.yaml b/examples/demos/procurement_agent/environments.yaml new file mode 100644 index 00000000..90f44ae6 --- /dev/null +++ b/examples/demos/procurement_agent/environments.yaml @@ -0,0 +1,64 @@ +# Agent Environment Configuration +# ------------------------------ +# This file defines environment-specific settings for your agent. +# This DIFFERS from the manifest.yaml file in that it is used to program things that are ONLY per environment. + +# ********** EXAMPLE ********** +# schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +# environments: +# dev: +# auth: +# principal: +# user_id: "1234567890" +# user_name: "John Doe" +# user_email: "john.doe@example.com" +# user_role: "admin" +# user_permissions: "read, write, delete" +# helm_overrides: # This is used to override the global helm values.yaml file in the agentex-agent helm charts +# replicas: 3 +# resources: +# requests: +# cpu: "1000m" +# memory: "2Gi" +# limits: +# cpu: "2000m" +# memory: "4Gi" +# env: +# - name: LOG_LEVEL +# value: "DEBUG" +# - name: ENVIRONMENT +# value: "staging" +# +# kubernetes: +# # OPTIONAL - Otherwise it will be derived from separately. However, this can be used to override the derived +# # namespace and deploy it with in the same namespace that already exists for a separate agent. +# namespace: "team-procurement-agent" +# ********** END EXAMPLE ********** + +schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + # This is used to override the global helm values.yaml file in the agentex-agent helm charts + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal-worker: + enabled: true + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/examples/demos/procurement_agent/manifest.yaml b/examples/demos/procurement_agent/manifest.yaml new file mode 100644 index 00000000..ad815e20 --- /dev/null +++ b/examples/demos/procurement_agent/manifest.yaml @@ -0,0 +1,144 @@ +# Agent Manifest Configuration +# --------------------------- +# This file defines how your agent should be built and deployed. + +# Build Configuration +# ------------------ +# The build config defines what gets packaged into your agent's Docker image. +# This same configuration is used whether building locally or remotely. +# +# When building: +# 1. All files from include_paths are collected into a build context +# 2. The context is filtered by dockerignore rules +# 3. The Dockerfile uses this context to build your agent's image +# 4. The image is pushed to a registry and used to run your agent +build: + context: + # Root directory for the build context + root: ../ # Keep this as the default root + + # Paths to include in the Docker build context + # Must include: + # - Your agent's directory (your custom agent code) + # These paths are collected and sent to the Docker daemon for building + include_paths: + - procurement_agent + + # Path to your agent's Dockerfile + # This defines how your agent's image is built from the context + # Relative to the root directory + dockerfile: procurement_agent/Dockerfile + + # Path to your agent's .dockerignore + # Filters unnecessary files from the build context + # Helps keep build context small and builds fast + dockerignore: procurement_agent/.dockerignore + + +# Local Development Configuration +# ----------------------------- +# Only used when running the agent locally +local_development: + agent: + port: 8000 # Port where your local ACP server is running + host_address: host.docker.internal # Host address for Docker networking (host.docker.internal for Docker, localhost for direct) + + # File paths for local development (relative to this manifest.yaml) + paths: + # Path to ACP server file + # Examples: + # project/acp.py (standard) + # src/server.py (custom structure) + # ../shared/acp.py (shared across projects) + # /absolute/path/acp.py (absolute path) + acp: project/acp.py + + # Path to temporal worker file + # Examples: + # project/run_worker.py (standard) + # workers/temporal.py (custom structure) + # ../shared/worker.py (shared across projects) + worker: project/run_worker.py + + +# Agent Configuration +# ----------------- +agent: + # Type of agent - either sync or async + acp_type: async + + # Unique name for your agent + # Used for task routing and monitoring + name: procurement-agent + + # Description of what your agent does + # Helps with documentation and discovery + description: An Agentex agent that manages procurement for building constructions + + # Temporal workflow configuration + # This enables your agent to run as a Temporal workflow for long-running tasks + temporal: + enabled: true + workflows: + # Name of the workflow class + # Must match the @workflow.defn name in your workflow.py + - name: procurement-agent + + # Queue name for task distribution + # Used by Temporal to route tasks to your agent + # Convention: _task_queue + queue_name: procurement_agent_queue + + # Optional: Health check port for temporal worker + # Defaults to 80 if not specified + # health_check_port: 80 + + # Optional: Credentials mapping + # Maps Kubernetes secrets to environment variables + # Common credentials include: + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + # - env_var_name: OPENAI_API_KEY + # secret_name: openai-api-key + # secret_key: api-key + + # Optional: Set Environment variables for running your agent locally as well + # as for deployment later on + env: + OPENAI_API_KEY: "" + # OPENAI_BASE_URL: "" + OPENAI_ORG_ID: "" + + +# Deployment Configuration +# ----------------------- +# Configuration for deploying your agent to Kubernetes clusters +deployment: + # Container image configuration + image: + repository: "" # Update with your container registry + tag: "latest" # Default tag, should be versioned in production + + imagePullSecrets: + - name: my-registry-secret # Update with your image pull secret name + + # Global deployment settings that apply to all clusters + # These can be overridden using --override-file with custom configuration files + global: + agent: + name: "procurement-agent" + description: "An Agentex agent that manages procurement for building constructions" + + # Default replica count + replicaCount: 1 + + # Default resource requirements + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/examples/demos/procurement_agent/project/__init__.py b/examples/demos/procurement_agent/project/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/demos/procurement_agent/project/acp.py b/examples/demos/procurement_agent/project/acp.py new file mode 100644 index 00000000..54cac94a --- /dev/null +++ b/examples/demos/procurement_agent/project/acp.py @@ -0,0 +1,59 @@ +import os +import sys + +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor + +# === DEBUG SETUP (AgentEx CLI Debug Support) === +if os.getenv("AGENTEX_DEBUG_ENABLED") == "true": + try: + import debugpy + + from agentex.lib.utils.logging import make_logger + + logger = make_logger(__name__) + debug_port = int(os.getenv("AGENTEX_DEBUG_PORT", "5679")) + debug_type = os.getenv("AGENTEX_DEBUG_TYPE", "acp") + wait_for_attach = os.getenv("AGENTEX_DEBUG_WAIT_FOR_ATTACH", "false").lower() == "true" + + # Configure debugpy + debugpy.configure(subProcess=False) + debugpy.listen(debug_port) + + logger.info(f"🐛 [{debug_type.upper()}] Debug server listening on port {debug_port}") + + if wait_for_attach: + logger.info(f"⏳ [{debug_type.upper()}] Waiting for debugger to attach...") + debugpy.wait_for_client() + logger.info(f"✅ [{debug_type.upper()}] Debugger attached!") + else: + logger.info(f"📡 [{debug_type.upper()}] Ready for debugger attachment") + + except ImportError: + print("❌ debugpy not available. Install with: pip install debugpy") + sys.exit(1) + except Exception as e: + print(f"❌ Debug setup failed: {e}") + sys.exit(1) +# === END DEBUG SETUP === + +from agentex.lib.types.fastacp import TemporalACPConfig +from agentex.lib.sdk.fastacp.fastacp import FastACP + +context_interceptor = ContextInterceptor() +streaming_model_provider = TemporalStreamingModelProvider() + +# Create the ACP server +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)], + interceptors=[context_interceptor] + ) +) \ No newline at end of file diff --git a/examples/demos/procurement_agent/project/activities/__init__.py b/examples/demos/procurement_agent/project/activities/__init__.py new file mode 100644 index 00000000..8c8e7bd5 --- /dev/null +++ b/examples/demos/procurement_agent/project/activities/__init__.py @@ -0,0 +1 @@ +"""Procurement agent activities module.""" diff --git a/examples/demos/procurement_agent/project/activities/activities.py b/examples/demos/procurement_agent/project/activities/activities.py new file mode 100644 index 00000000..f2581e14 --- /dev/null +++ b/examples/demos/procurement_agent/project/activities/activities.py @@ -0,0 +1,570 @@ +from __future__ import annotations + +import json +import uuid +import asyncio +from datetime import datetime, timedelta + +from temporalio import activity +from temporalio.exceptions import ApplicationError + +from project.data.database import ( + DatabaseError, + DataCorruptionError, + create_procurement_item, + delete_procurement_item, + update_procurement_item, + get_all_procurement_items, + get_schedule_for_workflow, + create_schedule_for_workflow, + get_procurement_item_by_name, + remove_delivery_item_for_workflow, + update_project_end_date_for_workflow, + update_delivery_date_for_item_for_workflow, +) +from project.models.events import ( + SubmitalApprovalEvent, + ShipmentDepartedFactoryEvent, +) +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + +@activity.defn +async def issue_purchase_order(event: SubmitalApprovalEvent) -> str: + """ + Issues a purchase order for construction materials. + + Call this when: + - A submittal is approved (Submittal_Approved event) + - Human feedback requests reissuing a purchase order + """ + uuid_purchase_order = str(uuid.uuid4()) + # wait for 5 seconds as if we were calling an API to issue a purchase order + await asyncio.sleep(5) + logger.info(f"Issuing purchase order: {event}") + logger.info(f"Purchase order ID: {uuid_purchase_order}") + + return f"Successfully issued purchase order with ID: {uuid_purchase_order}" + +@activity.defn +async def flag_potential_issue(event: ShipmentDepartedFactoryEvent) -> str: + """ + Flags a potential issue with a delivery date. + + Call this when: + - A shipment departure creates timeline concerns (Shipment_Departed_Factory event) + - When ETA = required date and there is zero buffer + - Human feedback identifies a potential delivery issue + """ + logger.info(f"Flagging potential issue: {event}") + logger.info(f"Potential issue flagged with delivery date: {event.eta}") + # imagine this is a call to an API to flag a potential issue, perhaps a notification to a team member + await asyncio.sleep(1) + return f"Potential issue flagged with delivery date: {event.eta}" + +@activity.defn +async def notify_team_shipment_arrived(event: ShipmentDepartedFactoryEvent) -> str: + """ + Notifies the team that a shipment has arrived. + + Call this when: + - A shipment arrives at the site (Shipment_Arrived_Site event) + - Human feedback requests team notification + """ + logger.info(f"Notifying team that shipment has arrived: {event.item}") + logger.info(f"Team notification sent for arrival of: {event.item}") + # imagine this is a call to an API to notify the team that a shipment has arrived, perhaps a notification to a team member + await asyncio.sleep(1) + + return f"Notifying team that shipment has arrived: {event.item}" + +@activity.defn +async def schedule_inspection(event: ShipmentDepartedFactoryEvent) -> str: + """ + Schedules an inspection for delivered materials. + + Call this when: + - A shipment arrives at the site (Shipment_Arrived_Site event) + - Human feedback requests scheduling an inspection + """ + inspection_date = datetime.now() + timedelta(days=1) + logger.info(f"Scheduling inspection for: {event.item} on {inspection_date}") + # imagine this is a call to an API to schedule an inspection + await asyncio.sleep(1) + return f"Scheduling inspection for {event.item} on {inspection_date}" + + + +@activity.defn +async def create_master_construction_schedule(workflow_id: str) -> str: + """ + Creates the master construction schedule for the workflow. + + Call this when: + - The workflow is created + + Args: + workflow_id: The Temporal workflow ID + + Raises: + ApplicationError: Non-retryable if data is invalid + DatabaseError: Retryable if database connection fails + """ + logger.info(f"Creating master construction schedule for workflow: {workflow_id}") + + try: + await create_schedule_for_workflow(workflow_id) + return "Master construction schedule created for workflow" + + except DataCorruptionError as e: + # Application error - invalid data, don't retry + logger.error(f"Data corruption error creating schedule: {e}") + raise ApplicationError( + f"Invalid data creating schedule: {e}", + type="DataCorruptionError", + non_retryable=True + ) from e + + except DatabaseError as e: + # Platform error - database connection issue, let Temporal retry + logger.warning(f"Database error creating schedule (will retry): {e}") + raise # Let Temporal retry with activity retry policy + + except Exception as e: + # Unexpected error - log and let Temporal retry + logger.error(f"Unexpected error creating schedule: {e}") + raise + +@activity.defn +async def get_master_construction_schedule(workflow_id: str) -> str: + """ + Gets the master construction schedule for the workflow. + + Call this when: + - You want to get the master construction schedule for the workflow + - Human feedback requests the master construction schedule + + Returns: + The master construction schedule for the workflow as JSON string + + Raises: + ApplicationError: Non-retryable if schedule not found or data corrupted + DatabaseError: Retryable if database connection fails + """ + try: + schedule = await get_schedule_for_workflow(workflow_id) + + if schedule is None: + # Schedule not found - this is an application error + logger.error(f"No schedule found for workflow {workflow_id}") + raise ApplicationError( + f"No master construction schedule found for workflow {workflow_id}", + type="ScheduleNotFoundError", + non_retryable=True + ) + + logger.info(f"Master construction schedule found for workflow: {workflow_id}") + return json.dumps(schedule) + + except ApplicationError: + # Re-raise application errors + raise + + except DataCorruptionError as e: + # Application error - corrupted data, don't retry + logger.error(f"Data corruption error retrieving schedule: {e}") + raise ApplicationError( + f"Schedule data corrupted: {e}", + type="DataCorruptionError", + non_retryable=True + ) from e + + except DatabaseError as e: + # Platform error - database connection issue, let Temporal retry + logger.warning(f"Database error retrieving schedule (will retry): {e}") + raise # Let Temporal retry with activity retry policy + + except Exception as e: + # Unexpected error - log and let Temporal retry + logger.error(f"Unexpected error retrieving schedule: {e}") + raise + +@activity.defn +async def update_delivery_date_for_item(workflow_id: str, item: str, new_delivery_date: str) -> str: + """ + Updates the delivery date for a specific item in the construction schedule. + + Call this when: + - You want to update the delivery date for a specific item in the construction schedule + - Human feedback requests updating the delivery date for a specific item + + Args: + workflow_id: The Temporal workflow ID + item: The item to update + new_delivery_date: The new delivery date + + Raises: + ApplicationError: Non-retryable if schedule/item not found + DatabaseError: Retryable if database connection fails + """ + logger.info(f"Updating delivery date for item: {item} to {new_delivery_date}") + + try: + await update_delivery_date_for_item_for_workflow(workflow_id, item, new_delivery_date) + return f"Delivery date updated for item: {item} to {new_delivery_date}" + + except DataCorruptionError as e: + # Application error - schedule or item not found, don't retry + logger.error(f"Data corruption error updating delivery date: {e}") + raise ApplicationError( + f"Failed to update delivery date: {e}", + type="DataCorruptionError", + non_retryable=True + ) from e + + except DatabaseError as e: + # Platform error - database connection issue, let Temporal retry + logger.warning(f"Database error updating delivery date (will retry): {e}") + raise # Let Temporal retry with activity retry policy + + except Exception as e: + # Unexpected error - log and let Temporal retry + logger.error(f"Unexpected error updating delivery date: {e}") + raise + +@activity.defn +async def remove_delivery_item(workflow_id: str, item: str) -> str: + """ + Removes a delivery item from the construction schedule. + + Call this when: + - You want to remove a delivery item from the construction schedule + - Human feedback requests removing a delivery item + + Args: + workflow_id: The Temporal workflow ID + item: The item to remove + + Raises: + ApplicationError: Non-retryable if schedule/item not found + DatabaseError: Retryable if database connection fails + """ + logger.info(f"Removing delivery item: {item}") + + try: + await remove_delivery_item_for_workflow(workflow_id, item) + return f"Delivery item removed from construction schedule: {item}" + + except DataCorruptionError as e: + # Application error - schedule or item not found, don't retry + logger.error(f"Data corruption error removing delivery item: {e}") + raise ApplicationError( + f"Failed to remove delivery item: {e}", + type="DataCorruptionError", + non_retryable=True + ) from e + + except DatabaseError as e: + # Platform error - database connection issue, let Temporal retry + logger.warning(f"Database error removing delivery item (will retry): {e}") + raise # Let Temporal retry with activity retry policy + + except Exception as e: + # Unexpected error - log and let Temporal retry + logger.error(f"Unexpected error removing delivery item: {e}") + raise + +@activity.defn +async def update_project_end_date(workflow_id: str, new_end_date: str) -> str: + """ + Updates the end date for the project in the construction schedule. + + Call this when: + - You want to update the end date for the project in the construction schedule + - Human feedback requests updating the end date for the project + + Args: + workflow_id: The Temporal workflow ID + new_end_date: The new end date for the project + + Raises: + ApplicationError: Non-retryable if schedule not found + DatabaseError: Retryable if database connection fails + """ + logger.info(f"Updating end date for project to: {new_end_date}") + + try: + await update_project_end_date_for_workflow(workflow_id, new_end_date) + return f"End date updated for project: {new_end_date}" + + except DataCorruptionError as e: + # Application error - schedule not found, don't retry + logger.error(f"Data corruption error updating project end date: {e}") + raise ApplicationError( + f"Failed to update project end date: {e}", + type="DataCorruptionError", + non_retryable=True + ) from e + + except DatabaseError as e: + # Platform error - database connection issue, let Temporal retry + logger.warning(f"Database error updating project end date (will retry): {e}") + raise # Let Temporal retry with activity retry policy + + except Exception as e: + # Unexpected error - log and let Temporal retry + logger.error(f"Unexpected error updating project end date: {e}") + raise + + +@activity.defn +async def create_procurement_item_activity( + workflow_id: str, + item: str, + status: str, + eta: str | None = None, + date_arrived: str | None = None, + purchase_order_id: str | None = None +) -> str: + """ + Creates a new procurement item for tracking through the workflow. + + Call this when: + - A submittal is approved (Submittal_Approved event) - automatically after submittal approval + - Human feedback requests creating a new procurement item + + Args: + workflow_id: The Temporal workflow ID + item: The item name (e.g., "Steel Beams") + status: Current status of the item (e.g., "submittal_approved") + eta: Optional estimated time of arrival + date_arrived: Optional date the item arrived + purchase_order_id: Optional purchase order ID + + Raises: + ApplicationError: Non-retryable if data is invalid + DatabaseError: Retryable if database connection fails + """ + logger.info(f"Creating procurement item for workflow {workflow_id}: {item} with status {status}") + + try: + await create_procurement_item( + workflow_id=workflow_id, + item=item, + status=status, + eta=eta, + date_arrived=date_arrived, + purchase_order_id=purchase_order_id + ) + return f"Procurement item created: {item} with status {status}" + + except DataCorruptionError as e: + # Application error - invalid data, don't retry + logger.error(f"Data corruption error creating procurement item: {e}") + raise ApplicationError( + f"Invalid data creating procurement item: {e}", + type="DataCorruptionError", + non_retryable=True + ) from e + + except DatabaseError as e: + # Platform error - database connection issue, let Temporal retry + logger.warning(f"Database error creating procurement item (will retry): {e}") + raise # Let Temporal retry with activity retry policy + + except Exception as e: + # Unexpected error - log and let Temporal retry + logger.error(f"Unexpected error creating procurement item: {e}") + raise + + +@activity.defn +async def update_procurement_item_activity( + workflow_id: str, + item: str, + status: str | None = None, + eta: str | None = None, + date_arrived: str | None = None, + purchase_order_id: str | None = None +) -> str: + """ + Updates a procurement item's fields. + + Call this when: + - Any event occurs that changes the item's status (e.g., shipment departed, arrived, inspection scheduled/failed/passed) + - Human feedback requests updating the procurement item + - Purchase order is issued + - ETA is updated + - Item arrives at site + + Args: + workflow_id: The Temporal workflow ID + item: The item name (e.g., "Steel Beams") + status: Optional new status + eta: Optional new estimated time of arrival + date_arrived: Optional new arrival date + purchase_order_id: Optional new purchase order ID + + Raises: + ApplicationError: Non-retryable if workflow_id invalid or item not found + DatabaseError: Retryable if database connection fails + """ + logger.info(f"Updating procurement item for workflow {workflow_id}: {item}") + + try: + await update_procurement_item( + workflow_id=workflow_id, + item=item, + status=status, + eta=eta, + date_arrived=date_arrived, + purchase_order_id=purchase_order_id + ) + return f"Procurement item updated for workflow {workflow_id}: {item}" + + except DataCorruptionError as e: + # Application error - item not found or invalid data, don't retry + logger.error(f"Data corruption error updating procurement item: {e}") + raise ApplicationError( + f"Failed to update procurement item: {e}", + type="DataCorruptionError", + non_retryable=True + ) from e + + except DatabaseError as e: + # Platform error - database connection issue, let Temporal retry + logger.warning(f"Database error updating procurement item (will retry): {e}") + raise # Let Temporal retry with activity retry policy + + except Exception as e: + # Unexpected error - log and let Temporal retry + logger.error(f"Unexpected error updating procurement item: {e}") + raise + + +@activity.defn +async def delete_procurement_item_activity(workflow_id: str, item: str) -> str: + """ + Deletes a procurement item from the database. + + Call this when: + - Human feedback explicitly requests removing/deleting an item (e.g., "remove the steel beams") + - Item is no longer needed in the project + + Args: + workflow_id: The Temporal workflow ID + item: The item name (e.g., "Steel Beams") + + Raises: + ApplicationError: Non-retryable if workflow_id invalid or item not found + DatabaseError: Retryable if database connection fails + """ + logger.info(f"Deleting procurement item for workflow {workflow_id}: {item}") + + try: + await delete_procurement_item(workflow_id, item) + return f"Procurement item deleted for workflow {workflow_id}: {item}" + + except DataCorruptionError as e: + # Application error - item not found, don't retry + logger.error(f"Data corruption error deleting procurement item: {e}") + raise ApplicationError( + f"Failed to delete procurement item: {e}", + type="DataCorruptionError", + non_retryable=True + ) from e + + except DatabaseError as e: + # Platform error - database connection issue, let Temporal retry + logger.warning(f"Database error deleting procurement item (will retry): {e}") + raise # Let Temporal retry with activity retry policy + + except Exception as e: + # Unexpected error - log and let Temporal retry + logger.error(f"Unexpected error deleting procurement item: {e}") + raise + + +@activity.defn +async def get_procurement_item_by_name_activity(workflow_id: str, item: str) -> str: + """ + Retrieves a procurement item by workflow ID and item name. + + Call this when: + - You need to check the status of a specific item + - You need context about an item before making decisions + - Human feedback requests information about a specific item + + Args: + workflow_id: The Temporal workflow ID + item: The item name (e.g., "Steel Beams") + + Returns: + JSON string of the procurement item or message if not found + + Raises: + ApplicationError: Non-retryable if input data is invalid + DatabaseError: Retryable if database connection fails + """ + logger.info(f"Getting procurement item for workflow {workflow_id}: {item}") + + try: + result = await get_procurement_item_by_name(workflow_id, item) + + if result is None: + return f"No procurement item found for workflow {workflow_id} with item name: {item}" + + return json.dumps(result) + + except DataCorruptionError as e: + # Application error - invalid input, don't retry + logger.error(f"Data corruption error getting procurement item: {e}") + raise ApplicationError( + f"Invalid input getting procurement item: {e}", + type="DataCorruptionError", + non_retryable=True + ) from e + + except DatabaseError as e: + # Platform error - database connection issue, let Temporal retry + logger.warning(f"Database error getting procurement item (will retry): {e}") + raise # Let Temporal retry with activity retry policy + + except Exception as e: + # Unexpected error - log and let Temporal retry + logger.error(f"Unexpected error getting procurement item: {e}") + raise + + +@activity.defn +async def get_all_procurement_items_activity() -> str: + """ + Retrieves all procurement items from the database. + + Call this when: + - You need an overview of all procurement items + - You need to check the status of multiple items + - Human feedback requests a summary of all items + + Returns: + JSON string of all procurement items + + Raises: + DatabaseError: Retryable if database connection fails + """ + logger.info("Getting all procurement items") + + try: + results = await get_all_procurement_items() + return json.dumps(results) + + except DatabaseError as e: + # Platform error - database connection issue, let Temporal retry + logger.warning(f"Database error getting all procurement items (will retry): {e}") + raise # Let Temporal retry with activity retry policy + + except Exception as e: + # Unexpected error - log and let Temporal retry + logger.error(f"Unexpected error getting all procurement items: {e}") + raise \ No newline at end of file diff --git a/examples/demos/procurement_agent/project/agents/__init__.py b/examples/demos/procurement_agent/project/agents/__init__.py new file mode 100644 index 00000000..08d7078b --- /dev/null +++ b/examples/demos/procurement_agent/project/agents/__init__.py @@ -0,0 +1 @@ +"""Procurement agent agents module.""" diff --git a/examples/demos/procurement_agent/project/agents/extract_learnings_agent.py b/examples/demos/procurement_agent/project/agents/extract_learnings_agent.py new file mode 100644 index 00000000..ca6ea680 --- /dev/null +++ b/examples/demos/procurement_agent/project/agents/extract_learnings_agent.py @@ -0,0 +1,53 @@ +"""Agent for extracting learnings from human interactions.""" + + +from agents import Agent + + +def new_extract_learnings_agent() -> Agent: + """ + Create an agent that extracts 1-2 sentence learnings from human interactions. + + This agent analyzes the full conversation context to understand how we got to + the human interaction and what key insight or decision was made. + + Returns: + Agent configured to extract a concise learning + """ + instructions = """ +You are a learning extraction agent for a procurement system. + +Your job is to analyze only the wait_for_human tool call OUTPUT and extract a concise 1-2 sentence learning that can be applied to future decisions. +We care about the output as that is what the human actually said. The input is AI generated, we are trying to extract what decision the human made. + +For example: + + Example usage from the conversation: + { + "arguments": "{\"recommended_action\":\"\"The inspection failed I recommend we re-order the item.\"\"}", + "call_id": "call_FqWa25mlCKwo8gA3zr4TwHca", + "name": "wait_for_human", + "type": "function_call", + "id": "fc_08a992817d632789006914d90bbb948194bd20eb784f33c2a5", + "status": "completed" + } + + Human response received: + { + "call_id": "call_FqWa25mlCKwo8gA3zr4TwHca", + "output": "No, we should not re-order the item. Please remove the item from the master schedule.", + "type": "function_call_output" + } +Learning: When we fail inspection, the recommended action is to remove the item from the master schedule. + +The rest of the information is just context but the focus should be on understanding what the human wanted to do and why. + +Please extract a 1-2 sentence learning from the wait_for_human tool call. +""" + + return Agent( + name="Extract Learnings Agent", + instructions=instructions, + model="gpt-4o", + tools=[], # No tools needed - just analysis + ) diff --git a/examples/demos/procurement_agent/project/agents/procurement_agent.py b/examples/demos/procurement_agent/project/agents/procurement_agent.py new file mode 100644 index 00000000..475cd83e --- /dev/null +++ b/examples/demos/procurement_agent/project/agents/procurement_agent.py @@ -0,0 +1,502 @@ +"""Event agent for processing procurement events and taking actions.""" +from __future__ import annotations + +from datetime import datetime, timedelta + +from agents import Agent, function_tool +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.contrib import openai_agents +from temporalio.exceptions import TimeoutError, ApplicationError + +from project.activities.activities import ( + schedule_inspection, + flag_potential_issue, + issue_purchase_order, + remove_delivery_item, + update_project_end_date, + notify_team_shipment_arrived, + update_delivery_date_for_item, + create_procurement_item_activity, + delete_procurement_item_activity, + update_procurement_item_activity, + get_all_procurement_items_activity, + get_procurement_item_by_name_activity, +) + + +@function_tool +async def wait_for_human(recommended_action: str) -> str: + """ + When the we are stuck and need to ask a human for help, call this tool. Please provide a recommended action to the human. + Until the human approves the recommended action, you will keep calling this tool (call it as many times as needed). + If the human says anything other than yes, please use this tool again and come up with a new recommended action. + If the human wants to add additional information, please use this tool again and come up with a new recommended action. + You are almost always calling this tool again unless the human approves the exact recommended action. + + For example: + + Assistant recommendation: The inspection failed I recommend we re-order the item. + Human response: No, we should not re-order the item. Please remove the item from the master schedule. + Assistant recommendation: Ok I will go ahead and remove the item from the master schedule. Do you approve? + Human response: Yes + + Assistant recommendation: The inspection failed I recommend we re-order the item. + Human response: Yes and also please update the master schedule to reflect the new delivery date. + Assistant recommendation: Ok I will go ahead and update the master schedule to reflect the new delivery date and re-order the item. Does that sound right? + Human response: Yes + """ + workflow_instance = workflow.instance() + workflow.logger.info(f"Recommended action: {recommended_action}") + + try: + # Wait for human response with 24-hour timeout (don't wait forever!) + await workflow.wait_condition( + lambda: not workflow_instance.human_queue.empty(), + timeout=timedelta(hours=24), + ) + + while not workflow_instance.human_queue.empty(): + human_input = await workflow_instance.human_queue.get() + print(f"[WORKFLOW] Processing human message from queue") + return human_input + + # If queue became empty after wait_condition succeeded, this shouldn't normally happen + workflow.logger.warning("Queue empty after wait condition succeeded") + return "No human response available" + + except TimeoutError: + # Human didn't respond within 24 hours + workflow.logger.warning("Human escalation timed out after 24 hours") + return "TIMEOUT: No human response received within 24 hours. Proceeding with best judgment." + + +@function_tool +async def update_delivery_date_tool(item: str, new_delivery_date: str) -> str: + """ + Updates the delivery date for a specific item in the construction schedule. + + Call this when: + - You want to update the delivery date for a specific item in the construction schedule + - Human feedback requests updating the delivery date for a specific item + + Args: + item: The item to update + new_delivery_date: The new delivery date + + Returns: + Confirmation message or error description + """ + workflow_id = workflow.info().workflow_id + + retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + maximum_interval=timedelta(seconds=120), + maximum_attempts=5, + non_retryable_error_types=["DataCorruptionError"], + ) + + try: + return await workflow.execute_activity( + update_delivery_date_for_item, + args=[workflow_id, item, new_delivery_date], + start_to_close_timeout=timedelta(minutes=5), + schedule_to_close_timeout=timedelta(minutes=10), + retry_policy=retry_policy, + ) + except ApplicationError as e: + # Non-retryable error (item not found, schedule missing) + workflow.logger.error(f"Failed to update delivery date for {item}: {e}") + return f"Error: Unable to update delivery date for {item}. {e.message}" + except Exception as e: + # Unexpected error + workflow.logger.error(f"Unexpected error updating delivery date: {e}") + return f"Error: System issue updating delivery date for {item}. Please try again." + + +@function_tool +async def remove_delivery_item_tool(item: str) -> str: + """ + Removes a delivery item from the construction schedule. + + Call this when: + - You want to remove a delivery item from the construction schedule + - Human feedback requests removing a delivery item + + Args: + item: The item to remove + + Returns: + Confirmation message or error description + """ + workflow_id = workflow.info().workflow_id + + retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + maximum_interval=timedelta(seconds=120), + maximum_attempts=5, + non_retryable_error_types=["DataCorruptionError"], + ) + + try: + return await workflow.execute_activity( + remove_delivery_item, + args=[workflow_id, item], + start_to_close_timeout=timedelta(minutes=5), + schedule_to_close_timeout=timedelta(minutes=10), + retry_policy=retry_policy, + ) + except ApplicationError as e: + # Non-retryable error (item not found, schedule missing) + workflow.logger.error(f"Failed to remove delivery item {item}: {e}") + return f"Error: Unable to remove item {item}. {e.message}" + except Exception as e: + # Unexpected error + workflow.logger.error(f"Unexpected error removing delivery item: {e}") + return f"Error: System issue removing item {item}. Please try again." + + +@function_tool +async def update_project_end_date_tool(new_end_date: str) -> str: + """ + Updates the end date for the project in the construction schedule. + + Call this when: + - You want to update the end date for the project in the construction schedule + - Human feedback requests updating the end date for the project + + Args: + new_end_date: The new end date for the project + + Returns: + Confirmation message or error description + """ + workflow_id = workflow.info().workflow_id + + retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + maximum_interval=timedelta(seconds=120), + maximum_attempts=5, + non_retryable_error_types=["DataCorruptionError"], + ) + + try: + return await workflow.execute_activity( + update_project_end_date, + args=[workflow_id, new_end_date], + start_to_close_timeout=timedelta(minutes=5), + schedule_to_close_timeout=timedelta(minutes=10), + retry_policy=retry_policy, + ) + except ApplicationError as e: + # Non-retryable error (schedule not found) + workflow.logger.error(f"Failed to update project end date: {e}") + return f"Error: Unable to update project end date. {e.message}" + except Exception as e: + # Unexpected error + workflow.logger.error(f"Unexpected error updating project end date: {e}") + return f"Error: System issue updating project end date. Please try again." + + +@function_tool +async def create_procurement_item_tool( + item: str, + status: str, + eta: str | None = None, + date_arrived: str | None = None, + purchase_order_id: str | None = None +) -> str: + """ + Creates a new procurement item for tracking through the workflow. + + Call this when: + - A submittal is approved (after calling issue_purchase_order) + - You need to track a new item in the procurement system + + Args: + item: The item name (e.g., "Steel Beams") + status: Current status (e.g., "submittal_approved", "purchase_order_issued") + eta: Optional estimated time of arrival (ISO format) + date_arrived: Optional date the item arrived (ISO format) + purchase_order_id: Optional purchase order ID + + Returns: + Confirmation message or error description + """ + workflow_id = workflow.info().workflow_id + + retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + maximum_interval=timedelta(seconds=120), + maximum_attempts=5, + non_retryable_error_types=["DataCorruptionError"], + ) + + try: + return await workflow.execute_activity( + create_procurement_item_activity, + args=[workflow_id, item, status, eta, date_arrived, purchase_order_id], + start_to_close_timeout=timedelta(minutes=5), + schedule_to_close_timeout=timedelta(minutes=10), + retry_policy=retry_policy, + ) + except ApplicationError as e: + # Non-retryable error (invalid data) + workflow.logger.error(f"Failed to create procurement item for {item}: {e}") + return f"Error: Unable to create procurement item for {item}. {e.message}" + except Exception as e: + # Unexpected error + workflow.logger.error(f"Unexpected error creating procurement item: {e}") + return f"Error: System issue creating procurement item for {item}. Please try again." + + +@function_tool +async def update_procurement_item_tool( + item: str, + status: str | None = None, + eta: str | None = None, + date_arrived: str | None = None, + purchase_order_id: str | None = None +) -> str: + """ + Updates a procurement item's fields in the tracking system. + + Call this when: + - An event changes the item's status (e.g., shipment departed, arrived, inspection scheduled/failed/passed) + - A purchase order is issued for the item + - The ETA is updated + - The item arrives at the site + - A potential issue is flagged + + Args: + item: The item name (e.g., "Steel Beams", "HVAC Units") - REQUIRED to identify which item to update + status: Optional new status (e.g., "purchase_order_issued", "shipment_departed", "shipment_arrived", + "potential_issue_flagged", "inspection_scheduled", "inspection_failed", "inspection_passed") + eta: Optional new estimated time of arrival (ISO format) + date_arrived: Optional new arrival date (ISO format) + purchase_order_id: Optional new purchase order ID + + Returns: + Confirmation message or error description + """ + workflow_id = workflow.info().workflow_id + + retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + maximum_interval=timedelta(seconds=120), + maximum_attempts=5, + non_retryable_error_types=["DataCorruptionError"], + ) + + try: + return await workflow.execute_activity( + update_procurement_item_activity, + args=[workflow_id, item, status, eta, date_arrived, purchase_order_id], + start_to_close_timeout=timedelta(minutes=5), + schedule_to_close_timeout=timedelta(minutes=10), + retry_policy=retry_policy, + ) + except ApplicationError as e: + # Non-retryable error (item not found) + workflow.logger.error(f"Failed to update procurement item: {e}") + return f"Error: Unable to update procurement item. {e.message}" + except Exception as e: + # Unexpected error + workflow.logger.error(f"Unexpected error updating procurement item: {e}") + return f"Error: System issue updating procurement item. Please try again." + + +@function_tool +async def delete_procurement_item_tool(item: str) -> str: + """ + Deletes a procurement item from the tracking system. + + Call this when: + - Human explicitly requests removing/deleting an item + - An item is no longer needed in the project + + Args: + item: The item name to delete (e.g., "Steel Beams", "HVAC Units") + + Returns: + Confirmation message or error description + """ + workflow_id = workflow.info().workflow_id + + retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + maximum_interval=timedelta(seconds=120), + maximum_attempts=5, + non_retryable_error_types=["DataCorruptionError"], + ) + + try: + return await workflow.execute_activity( + delete_procurement_item_activity, + args=[workflow_id, item], + start_to_close_timeout=timedelta(minutes=5), + schedule_to_close_timeout=timedelta(minutes=10), + retry_policy=retry_policy, + ) + except ApplicationError as e: + # Non-retryable error (item not found) + workflow.logger.error(f"Failed to delete procurement item: {e}") + return f"Error: Unable to delete procurement item. {e.message}" + except Exception as e: + # Unexpected error + workflow.logger.error(f"Unexpected error deleting procurement item: {e}") + return f"Error: System issue deleting procurement item. Please try again." + + +@function_tool +async def get_procurement_item_by_name_tool(item: str) -> str: + """ + Retrieves a procurement item by item name for context. + + Call this when: + - You need to check the status of a specific item before making decisions + - Human asks about the status of an item + - You need additional context about an item + + Args: + item: The item name (e.g., "Steel Beams") + + Returns: + JSON string of the procurement item or message if not found + """ + workflow_id = workflow.info().workflow_id + + retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + maximum_interval=timedelta(seconds=120), + maximum_attempts=5, + non_retryable_error_types=["DataCorruptionError"], + ) + + try: + return await workflow.execute_activity( + get_procurement_item_by_name_activity, + args=[workflow_id, item], + start_to_close_timeout=timedelta(minutes=5), + schedule_to_close_timeout=timedelta(minutes=10), + retry_policy=retry_policy, + ) + except ApplicationError as e: + # Non-retryable error (invalid input) + workflow.logger.error(f"Failed to get procurement item {item}: {e}") + return f"Error: Unable to get procurement item {item}. {e.message}" + except Exception as e: + # Unexpected error + workflow.logger.error(f"Unexpected error getting procurement item: {e}") + return f"Error: System issue getting procurement item {item}. Please try again." + + +@function_tool +async def get_all_procurement_items_tool() -> str: + """ + Retrieves all procurement items for context. + + Call this when: + - You need an overview of all procurement items + - Human asks for a summary of all items + - You need to check multiple items' statuses + + Returns: + JSON string of all procurement items + """ + retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + maximum_interval=timedelta(seconds=120), + maximum_attempts=5, + non_retryable_error_types=["DataCorruptionError"], + ) + + try: + return await workflow.execute_activity( + get_all_procurement_items_activity, + start_to_close_timeout=timedelta(minutes=5), + schedule_to_close_timeout=timedelta(minutes=10), + retry_policy=retry_policy, + ) + except ApplicationError as e: + # Non-retryable error + workflow.logger.error(f"Failed to get all procurement items: {e}") + return f"Error: Unable to get all procurement items. {e.message}" + except Exception as e: + # Unexpected error + workflow.logger.error(f"Unexpected error getting all procurement items: {e}") + return f"Error: System issue getting all procurement items. Please try again." + +def new_procurement_agent(master_construction_schedule: str, human_input_learnings: list) -> Agent: + """ + Create an agent that processes procurement events and takes actions. + + Args: + event_log: History of events that have occurred + master_construction_schedule: Current construction schedule + human_input_learnings: Past escalations and human decisions + + Returns: + Agent configured to process events and call tools + """ + instructions = f""" +You are a procurement agent for a commercial building construction project. + +Your role is to monitor procurement events, take appropriate actions, and escalate critical issues to a human with a recommended action. + +Please escalate to a human if you feel like we are facing a critical schedule delay and provide a recommended action. + +If the user says no or has feedback, please come up with another solution and call the wait_for_human tool again (you can call it as many times as needed). + +## Context + +Master Construction Schedule: +{master_construction_schedule} + +Past Learnings from Escalations: +{human_input_learnings} + +Current Date: {datetime.now().isoformat()} + + + """ + + start_to_close_timeout = timedelta(days=1) + + return Agent( + name="Procurement Event Agent", + instructions=instructions, + model="gpt-4o", + tools=[ + openai_agents.workflow.activity_as_tool( + issue_purchase_order, start_to_close_timeout=start_to_close_timeout + ), + openai_agents.workflow.activity_as_tool( + flag_potential_issue, start_to_close_timeout=start_to_close_timeout + ), + openai_agents.workflow.activity_as_tool( + notify_team_shipment_arrived, + start_to_close_timeout=start_to_close_timeout, + ), + openai_agents.workflow.activity_as_tool( + schedule_inspection, start_to_close_timeout=start_to_close_timeout + ), + update_delivery_date_tool, # function_tool wrapper that injects workflow_id + remove_delivery_item_tool, # function_tool wrapper that injects workflow_id + update_project_end_date_tool, # function_tool wrapper that injects workflow_id + create_procurement_item_tool, # function_tool wrapper for creating procurement items + update_procurement_item_tool, # function_tool wrapper for updating procurement items + delete_procurement_item_tool, # function_tool wrapper for deleting procurement items + get_procurement_item_by_name_tool, # function_tool wrapper for getting a specific procurement item + get_all_procurement_items_tool, # function_tool wrapper for getting all procurement items + wait_for_human, # function_tool runs in workflow context + ], + ) \ No newline at end of file diff --git a/examples/demos/procurement_agent/project/agents/summarization_agent.py b/examples/demos/procurement_agent/project/agents/summarization_agent.py new file mode 100644 index 00000000..e74f2d46 --- /dev/null +++ b/examples/demos/procurement_agent/project/agents/summarization_agent.py @@ -0,0 +1,53 @@ +"""Agent for summarizing conversation history.""" + +from agents import Agent + + +def new_summarization_agent() -> Agent: + """ + Create an agent that summarizes conversation history for context compression. + + This agent analyzes the conversation and creates a detailed but concise summary + that captures key events, decisions, and current state for continuing the workflow. + + Returns: + Agent configured to generate conversation summaries + """ + instructions = """ +You are a summarization agent for a procurement workflow system. + +Your job is to create a detailed but concise summary of the conversation history. +Focus on information that would be helpful for continuing the conversation, including: + +- What procurement events have occurred (submittals, shipments, inspections, etc.) +- What items are being tracked and their current status +- What actions have been taken (purchase orders issued, inspections scheduled, etc.) +- Any critical issues or delays that were identified +- Any human decisions or escalations that occurred +- What is currently being worked on +- What needs to be done next + +Your summary should be comprehensive enough to provide full context but concise enough +to be quickly understood. Aim for 3-5 paragraphs organized by topic. + +Focus on the OUTCOMES and CURRENT STATE rather than listing every single tool call. + +Example format: + +**Items Tracked:** +Steel Beams have been approved, purchase order issued (ID: 6c9e401a...), shipment arrived +on 2026-02-10, inspection passed. Currently marked as complete. + +**Current Status:** +All items are on schedule with no delays. The workflow is progressing smoothly. + +**Next Steps:** +Continue monitoring upcoming deliveries for HVAC Units and Windows. +""" + + return Agent( + name="Summarization Agent", + instructions=instructions, + model="gpt-4o", + tools=[], # No tools needed - just summarization + ) diff --git a/examples/demos/procurement_agent/project/data/__init__.py b/examples/demos/procurement_agent/project/data/__init__.py new file mode 100644 index 00000000..ec504c84 --- /dev/null +++ b/examples/demos/procurement_agent/project/data/__init__.py @@ -0,0 +1 @@ +"""Procurement agent data module.""" diff --git a/examples/demos/procurement_agent/project/data/database.py b/examples/demos/procurement_agent/project/data/database.py new file mode 100644 index 00000000..a756f7ef --- /dev/null +++ b/examples/demos/procurement_agent/project/data/database.py @@ -0,0 +1,686 @@ +""" +Database initialization and management for procurement agent. +Stores master construction schedules indexed by workflow ID. +""" +from __future__ import annotations + +import json +from typing import Optional +from pathlib import Path + +import aiosqlite # type: ignore[import-untyped] + +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + + +# Custom exceptions for database operations +class DatabaseError(Exception): + """Platform-level database errors (retryable by Temporal)""" + pass + + +class DataCorruptionError(Exception): + """Application-level data errors (non-retryable)""" + pass + +# Database file location (in the data directory) +DB_PATH = Path(__file__).parent / "procurement.db" + +DEFAULT_SCHEDULE = { + "project": { + "name": "Small Office Renovation", + "start_date": "2026-02-01", + "end_date": "2026-05-31" + }, + "deliveries": [ + { + "item": "Steel Beams", + "required_by": "2026-02-15", + "buffer_days": 5 + }, + { + "item": "HVAC Units", + "required_by": "2026-03-01", + "buffer_days": 7 + }, + { + "item": "Windows", + "required_by": "2026-03-15", + "buffer_days": 10 + }, + { + "item": "Flooring Materials", + "required_by": "2026-04-01", + "buffer_days": 3 + }, + { + "item": "Electrical Panels", + "required_by": "2026-04-15", + "buffer_days": 5 + } + ] +} + + +async def init_database() -> None: + """ + Initialize the SQLite database and create tables if they don't exist. + Creates the master_construction_schedule and procurement_items tables. + Safe to call multiple times - uses CREATE TABLE IF NOT EXISTS. + + Raises: + DatabaseError: If database initialization fails + """ + logger.info(f"Initializing database at {DB_PATH}") + + try: + async with aiosqlite.connect(DB_PATH) as db: + await db.execute(""" + CREATE TABLE IF NOT EXISTS master_construction_schedule ( + workflow_id TEXT PRIMARY KEY, + project_name TEXT NOT NULL, + project_start_date TEXT NOT NULL, + project_end_date TEXT NOT NULL, + schedule_json TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + # Create index on workflow_id for faster lookups + await db.execute(""" + CREATE INDEX IF NOT EXISTS idx_workflow_id + ON master_construction_schedule(workflow_id) + """) + + # Create procurement_items table for tracking item status through workflow + await db.execute(""" + CREATE TABLE IF NOT EXISTS procurement_items ( + workflow_id TEXT NOT NULL, + item TEXT NOT NULL, + status TEXT NOT NULL, + eta TEXT, + date_arrived TEXT, + purchase_order_id TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (workflow_id, item) + ) + """) + + # Create index on workflow_id for faster lookups + await db.execute(""" + CREATE INDEX IF NOT EXISTS idx_procurement_workflow_id + ON procurement_items(workflow_id) + """) + + await db.commit() + logger.info("Database initialized successfully") + + except aiosqlite.Error as e: + # Fatal error - can't initialize database + logger.error(f"Failed to initialize database: {e}") + raise DatabaseError(f"Failed to initialize database: {e}") from e + except Exception as e: + logger.error(f"Unexpected error during database initialization: {e}") + raise DatabaseError(f"Unexpected database initialization error: {e}") from e + + +async def create_schedule_for_workflow( + workflow_id: str, + schedule: Optional[dict] = None +) -> None: + """ + Create a new construction schedule for a specific workflow. + Uses default schedule if none provided. + + Args: + workflow_id: The Temporal workflow ID + schedule: Optional custom schedule dict. If None, uses DEFAULT_SCHEDULE + + Raises: + DatabaseError: If database operation fails (retryable by Temporal) + DataCorruptionError: If schedule data is invalid (non-retryable) + """ + # Input validation - non-retryable errors + if not workflow_id or not isinstance(workflow_id, str): + raise DataCorruptionError("Invalid workflow_id: must be a non-empty string") + + if schedule is None: + schedule = DEFAULT_SCHEDULE + + # Validate schedule structure - non-retryable errors + try: + if "project" not in schedule: + raise DataCorruptionError("Schedule missing 'project' key") + required_keys = ["name", "start_date", "end_date"] + for key in required_keys: + if key not in schedule["project"]: + raise DataCorruptionError(f"Schedule project missing required key: {key}") + except (TypeError, AttributeError) as e: + raise DataCorruptionError(f"Invalid schedule structure: {e}") from e + + try: + # Validate JSON serialization before inserting + schedule_json = json.dumps(schedule) + + async with aiosqlite.connect(DB_PATH) as db: + await db.execute(""" + INSERT OR REPLACE INTO master_construction_schedule + (workflow_id, project_name, project_start_date, project_end_date, schedule_json, updated_at) + VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + """, ( + workflow_id, + schedule["project"]["name"], + schedule["project"]["start_date"], + schedule["project"]["end_date"], + schedule_json + )) + await db.commit() + logger.info(f"Created schedule for workflow {workflow_id}") + + except (TypeError, ValueError) as e: + # Data error - can't serialize to JSON, don't retry + logger.error(f"Failed to serialize schedule to JSON: {e}") + raise DataCorruptionError(f"Schedule data cannot be serialized: {e}") from e + + except aiosqlite.IntegrityError as e: + # Data constraint violation - don't retry + logger.error(f"Data integrity error: {e}") + raise DataCorruptionError(f"Data integrity error: {e}") from e + + except aiosqlite.Error as e: + # Database connection/lock errors - retryable + logger.warning(f"Database error creating schedule (retryable): {e}") + raise DatabaseError(f"Failed to create schedule: {e}") from e + + except Exception as e: + # Unexpected error - treat as retryable + logger.error(f"Unexpected error creating schedule: {e}") + raise DatabaseError(f"Unexpected error creating schedule: {e}") from e + + +async def get_schedule_for_workflow(workflow_id: str) -> Optional[dict]: + """ + Retrieve the construction schedule for a specific workflow. + + Args: + workflow_id: The Temporal workflow ID + + Returns: + The schedule dict or None if not found + + Raises: + DatabaseError: If database operation fails (retryable by Temporal) + DataCorruptionError: If stored JSON is corrupted (non-retryable) + """ + try: + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute(""" + SELECT schedule_json FROM master_construction_schedule + WHERE workflow_id = ? + """, (workflow_id,)) as cursor: + row = await cursor.fetchone() + if row: + # Validate JSON before returning + try: + return json.loads(row["schedule_json"]) + except json.JSONDecodeError as e: + logger.error(f"Corrupted JSON in database for workflow {workflow_id}: {e}") + raise DataCorruptionError( + f"Schedule JSON corrupted for workflow {workflow_id}: {e}" + ) from e + return None + + except DataCorruptionError: + # Re-raise data corruption errors + raise + + except aiosqlite.Error as e: + # Database connection errors - retryable + logger.warning(f"Database error retrieving schedule (retryable): {e}") + raise DatabaseError(f"Failed to retrieve schedule: {e}") from e + + except Exception as e: + # Unexpected error - treat as retryable + logger.error(f"Unexpected error retrieving schedule: {e}") + raise DatabaseError(f"Unexpected error retrieving schedule: {e}") from e + +async def update_delivery_date_for_item_for_workflow(workflow_id: str, item: str, new_delivery_date: str) -> None: + """ + Update the delivery date for a specific item in the construction schedule for a specific workflow. + + Raises: + DatabaseError: If database operation fails (retryable by Temporal) + DataCorruptionError: If schedule not found or item not found (non-retryable) + """ + # Get the current schedule (may raise DatabaseError or DataCorruptionError) + schedule = await get_schedule_for_workflow(workflow_id) + if schedule is None: + logger.error(f"No schedule found for workflow {workflow_id}") + raise DataCorruptionError(f"No schedule found for workflow {workflow_id}") + + # Update the delivery item's required_by date + updated = False + for delivery in schedule.get("deliveries", []): + if delivery.get("item") == item: + delivery["required_by"] = new_delivery_date + updated = True + break + + if not updated: + logger.error(f"Item {item} not found in schedule for workflow {workflow_id}") + raise DataCorruptionError(f"Item {item} not found in schedule for workflow {workflow_id}") + + # Save the updated schedule back to the database + try: + async with aiosqlite.connect(DB_PATH) as db: + await db.execute(""" + UPDATE master_construction_schedule + SET schedule_json = ?, updated_at = CURRENT_TIMESTAMP + WHERE workflow_id = ? + """, (json.dumps(schedule), workflow_id)) + await db.commit() + logger.info(f"Updated delivery date for item {item} in workflow {workflow_id}") + + except aiosqlite.Error as e: + # Database connection errors - retryable + logger.warning(f"Database error updating delivery date (retryable): {e}") + raise DatabaseError(f"Failed to update delivery date: {e}") from e + + except Exception as e: + # Unexpected error - treat as retryable + logger.error(f"Unexpected error updating delivery date: {e}") + raise DatabaseError(f"Unexpected error updating delivery date: {e}") from e + +async def remove_delivery_item_for_workflow(workflow_id: str, item: str) -> None: + """ + Remove a delivery item from the construction schedule for a specific workflow. + + Raises: + DatabaseError: If database operation fails (retryable by Temporal) + DataCorruptionError: If schedule not found or item not found (non-retryable) + """ + # Get the current schedule (may raise DatabaseError or DataCorruptionError) + schedule = await get_schedule_for_workflow(workflow_id) + if schedule is None: + logger.error(f"No schedule found for workflow {workflow_id}") + raise DataCorruptionError(f"No schedule found for workflow {workflow_id}") + + # Remove the delivery item from the list + original_count = len(schedule.get("deliveries", [])) + schedule["deliveries"] = [ + delivery for delivery in schedule.get("deliveries", []) + if delivery.get("item") != item + ] + + if len(schedule["deliveries"]) == original_count: + logger.error(f"Item {item} not found in schedule for workflow {workflow_id}") + raise DataCorruptionError(f"Item {item} not found in schedule for workflow {workflow_id}") + + # Save the updated schedule back to the database + try: + async with aiosqlite.connect(DB_PATH) as db: + await db.execute(""" + UPDATE master_construction_schedule + SET schedule_json = ?, updated_at = CURRENT_TIMESTAMP + WHERE workflow_id = ? + """, (json.dumps(schedule), workflow_id)) + await db.commit() + logger.info(f"Removed delivery item {item} from workflow {workflow_id}") + + except aiosqlite.Error as e: + # Database connection errors - retryable + logger.warning(f"Database error removing delivery item (retryable): {e}") + raise DatabaseError(f"Failed to remove delivery item: {e}") from e + + except Exception as e: + # Unexpected error - treat as retryable + logger.error(f"Unexpected error removing delivery item: {e}") + raise DatabaseError(f"Unexpected error removing delivery item: {e}") from e + +async def update_project_end_date_for_workflow(workflow_id: str, new_end_date: str) -> None: + """ + Update the end date for the project in the construction schedule for a specific workflow. + + Raises: + DatabaseError: If database operation fails (retryable by Temporal) + DataCorruptionError: If schedule not found (non-retryable) + """ + # Get the current schedule (may raise DatabaseError or DataCorruptionError) + schedule = await get_schedule_for_workflow(workflow_id) + if schedule is None: + logger.error(f"No schedule found for workflow {workflow_id}") + raise DataCorruptionError(f"No schedule found for workflow {workflow_id}") + + # Update the project end date in both the JSON and the dedicated column + schedule["project"]["end_date"] = new_end_date + + try: + async with aiosqlite.connect(DB_PATH) as db: + await db.execute(""" + UPDATE master_construction_schedule + SET project_end_date = ?, schedule_json = ?, updated_at = CURRENT_TIMESTAMP + WHERE workflow_id = ? + """, (new_end_date, json.dumps(schedule), workflow_id)) + await db.commit() + logger.info(f"Updated end date for project in workflow {workflow_id}") + + except aiosqlite.Error as e: + # Database connection errors - retryable + logger.warning(f"Database error updating project end date (retryable): {e}") + raise DatabaseError(f"Failed to update project end date: {e}") from e + + except Exception as e: + # Unexpected error - treat as retryable + logger.error(f"Unexpected error updating project end date: {e}") + raise DatabaseError(f"Unexpected error updating project end date: {e}") from e + + +async def create_procurement_item( + workflow_id: str, + item: str, + status: str, + eta: Optional[str] = None, + date_arrived: Optional[str] = None, + purchase_order_id: Optional[str] = None +) -> None: + """ + Create a new procurement item for tracking through the workflow. + + Args: + workflow_id: The Temporal workflow ID + item: The item name (e.g., "Steel Beams") + status: Current status of the item + eta: Optional estimated time of arrival + date_arrived: Optional date the item arrived + purchase_order_id: Optional purchase order ID + + Raises: + DatabaseError: If database operation fails (retryable by Temporal) + DataCorruptionError: If input data is invalid (non-retryable) + """ + # Input validation - non-retryable errors + if not workflow_id or not isinstance(workflow_id, str): + raise DataCorruptionError("Invalid workflow_id: must be a non-empty string") + + if not item or not isinstance(item, str): + raise DataCorruptionError("Invalid item: must be a non-empty string") + + if not status or not isinstance(status, str): + raise DataCorruptionError("Invalid status: must be a non-empty string") + + try: + async with aiosqlite.connect(DB_PATH) as db: + await db.execute(""" + INSERT OR REPLACE INTO procurement_items + (workflow_id, item, status, eta, date_arrived, purchase_order_id, updated_at) + VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + """, ( + workflow_id, + item, + status, + eta, + date_arrived, + purchase_order_id + )) + await db.commit() + logger.info(f"Created procurement item for workflow {workflow_id}: {item} with status {status}") + + except aiosqlite.IntegrityError as e: + # Data constraint violation - don't retry + logger.error(f"Data integrity error: {e}") + raise DataCorruptionError(f"Data integrity error: {e}") from e + + except aiosqlite.Error as e: + # Database connection/lock errors - retryable + logger.warning(f"Database error creating procurement item (retryable): {e}") + raise DatabaseError(f"Failed to create procurement item: {e}") from e + + except Exception as e: + # Unexpected error - treat as retryable + logger.error(f"Unexpected error creating procurement item: {e}") + raise DatabaseError(f"Unexpected error creating procurement item: {e}") from e + + +async def update_procurement_item( + workflow_id: str, + item: str, + status: Optional[str] = None, + eta: Optional[str] = None, + date_arrived: Optional[str] = None, + purchase_order_id: Optional[str] = None +) -> None: + """ + Update a procurement item's fields. Only updates fields that are provided. + + Args: + workflow_id: The Temporal workflow ID + item: The item name (e.g., "Steel Beams") + status: Optional new status + eta: Optional new estimated time of arrival + date_arrived: Optional new arrival date + purchase_order_id: Optional new purchase order ID + + Raises: + DatabaseError: If database operation fails (retryable by Temporal) + DataCorruptionError: If workflow_id is invalid or item not found (non-retryable) + """ + # Input validation - non-retryable errors + if not workflow_id or not isinstance(workflow_id, str): + raise DataCorruptionError("Invalid workflow_id: must be a non-empty string") + + if not item or not isinstance(item, str): + raise DataCorruptionError("Invalid item: must be a non-empty string") + + # Build dynamic update query based on provided fields + update_fields = [] + params = [] + + if status is not None: + update_fields.append("status = ?") + params.append(status) + + if eta is not None: + update_fields.append("eta = ?") + params.append(eta) + + if date_arrived is not None: + update_fields.append("date_arrived = ?") + params.append(date_arrived) + + if purchase_order_id is not None: + update_fields.append("purchase_order_id = ?") + params.append(purchase_order_id) + + if not update_fields: + logger.warning(f"No fields to update for workflow {workflow_id}") + return + + # Always update the updated_at timestamp + update_fields.append("updated_at = CURRENT_TIMESTAMP") + params.extend([workflow_id, item]) + + try: + async with aiosqlite.connect(DB_PATH) as db: + query = f""" + UPDATE procurement_items + SET {', '.join(update_fields)} + WHERE workflow_id = ? AND item = ? + """ + cursor = await db.execute(query, params) + + if cursor.rowcount == 0: + logger.error(f"No procurement item found for workflow {workflow_id} with item {item}") + raise DataCorruptionError(f"No procurement item found for workflow {workflow_id} with item {item}") + + await db.commit() + logger.info(f"Updated procurement item for workflow {workflow_id}") + + except DataCorruptionError: + # Re-raise data corruption errors + raise + + except aiosqlite.Error as e: + # Database connection errors - retryable + logger.warning(f"Database error updating procurement item (retryable): {e}") + raise DatabaseError(f"Failed to update procurement item: {e}") from e + + except Exception as e: + # Unexpected error - treat as retryable + logger.error(f"Unexpected error updating procurement item: {e}") + raise DatabaseError(f"Unexpected error updating procurement item: {e}") from e + + +async def delete_procurement_item(workflow_id: str, item: str) -> None: + """ + Delete a procurement item from the database. + + Args: + workflow_id: The Temporal workflow ID + item: The item name (e.g., "Steel Beams") + + Raises: + DatabaseError: If database operation fails (retryable by Temporal) + DataCorruptionError: If workflow_id is invalid or item not found (non-retryable) + """ + # Input validation - non-retryable errors + if not workflow_id or not isinstance(workflow_id, str): + raise DataCorruptionError("Invalid workflow_id: must be a non-empty string") + + if not item or not isinstance(item, str): + raise DataCorruptionError("Invalid item: must be a non-empty string") + + try: + async with aiosqlite.connect(DB_PATH) as db: + cursor = await db.execute(""" + DELETE FROM procurement_items + WHERE workflow_id = ? AND item = ? + """, (workflow_id, item)) + + if cursor.rowcount == 0: + logger.error(f"No procurement item found for workflow {workflow_id} with item {item}") + raise DataCorruptionError(f"No procurement item found for workflow {workflow_id} with item {item}") + + await db.commit() + logger.info(f"Deleted procurement item for workflow {workflow_id}") + + except DataCorruptionError: + # Re-raise data corruption errors + raise + + except aiosqlite.Error as e: + # Database connection errors - retryable + logger.warning(f"Database error deleting procurement item (retryable): {e}") + raise DatabaseError(f"Failed to delete procurement item: {e}") from e + + except Exception as e: + # Unexpected error - treat as retryable + logger.error(f"Unexpected error deleting procurement item: {e}") + raise DatabaseError(f"Unexpected error deleting procurement item: {e}") from e + + +async def get_procurement_item_by_name(workflow_id: str, item: str) -> Optional[dict]: + """ + Retrieve a procurement item for a specific workflow and item name. + + Args: + workflow_id: The Temporal workflow ID + item: The item name (e.g., "Steel Beams") + + Returns: + The procurement item dict or None if not found + + Raises: + DatabaseError: If database operation fails (retryable by Temporal) + DataCorruptionError: If input validation fails (non-retryable) + """ + # Input validation - non-retryable errors + if not workflow_id or not isinstance(workflow_id, str): + raise DataCorruptionError("Invalid workflow_id: must be a non-empty string") + + if not item or not isinstance(item, str): + raise DataCorruptionError("Invalid item: must be a non-empty string") + + try: + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute(""" + SELECT workflow_id, item, status, eta, date_arrived, purchase_order_id, created_at, updated_at + FROM procurement_items + WHERE workflow_id = ? AND item = ? + """, (workflow_id, item)) as cursor: + row = await cursor.fetchone() + if row: + return { + "workflow_id": row["workflow_id"], + "item": row["item"], + "status": row["status"], + "eta": row["eta"], + "date_arrived": row["date_arrived"], + "purchase_order_id": row["purchase_order_id"], + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + return None + + except DataCorruptionError: + # Re-raise data corruption errors + raise + + except aiosqlite.Error as e: + # Database connection errors - retryable + logger.warning(f"Database error retrieving procurement item (retryable): {e}") + raise DatabaseError(f"Failed to retrieve procurement item: {e}") from e + + except Exception as e: + # Unexpected error - treat as retryable + logger.error(f"Unexpected error retrieving procurement item: {e}") + raise DatabaseError(f"Unexpected error retrieving procurement item: {e}") from e + + +async def get_all_procurement_items() -> list[dict]: + """ + Retrieve all procurement items from the database. + + Returns: + List of procurement item dicts + + Raises: + DatabaseError: If database operation fails (retryable by Temporal) + """ + try: + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute(""" + SELECT workflow_id, item, status, eta, date_arrived, purchase_order_id, created_at, updated_at + FROM procurement_items + ORDER BY created_at DESC + """) as cursor: + rows = await cursor.fetchall() + return [ + { + "workflow_id": row["workflow_id"], + "item": row["item"], + "status": row["status"], + "eta": row["eta"], + "date_arrived": row["date_arrived"], + "purchase_order_id": row["purchase_order_id"], + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + for row in rows + ] + + except aiosqlite.Error as e: + # Database connection errors - retryable + logger.warning(f"Database error retrieving all procurement items (retryable): {e}") + raise DatabaseError(f"Failed to retrieve all procurement items: {e}") from e + + except Exception as e: + # Unexpected error - treat as retryable + logger.error(f"Unexpected error retrieving all procurement items: {e}") + raise DatabaseError(f"Unexpected error retrieving all procurement items: {e}") from e \ No newline at end of file diff --git a/examples/demos/procurement_agent/project/models/__init__.py b/examples/demos/procurement_agent/project/models/__init__.py new file mode 100644 index 00000000..1b2da8d1 --- /dev/null +++ b/examples/demos/procurement_agent/project/models/__init__.py @@ -0,0 +1 @@ +"""Procurement agent models module.""" diff --git a/examples/demos/procurement_agent/project/models/events.py b/examples/demos/procurement_agent/project/models/events.py new file mode 100644 index 00000000..634626ec --- /dev/null +++ b/examples/demos/procurement_agent/project/models/events.py @@ -0,0 +1,46 @@ +from enum import Enum +from datetime import datetime + +from pydantic import Field, BaseModel + + +class EventType(Enum): + SUBMITTAL_APPROVED = "Submittal_Approved" + SHIPMENT_DEPARTED_FACTORY = "Shipment_Departed_Factory" + SHIPMENT_ARRIVED_SITE = "Shipment_Arrived_Site" + INSPECTION_FAILED = "Inspection_Failed" + INSPECTION_PASSED = "Inspection_Passed" + HUMAN_INPUT = "Human_Input" + +class SubmitalApprovalEvent(BaseModel): + event_type: EventType = Field(default=EventType.SUBMITTAL_APPROVED) + item: str + document_url: str + document_name: str + +class ShipmentDepartedFactoryEvent(BaseModel): + event_type: EventType = Field(default=EventType.SHIPMENT_DEPARTED_FACTORY) + item: str + eta: datetime + date_departed: datetime + location_address: str + +class ShipmentArrivedSiteEvent(BaseModel): + event_type: EventType = Field(default=EventType.SHIPMENT_ARRIVED_SITE) + item: str + date_arrived: datetime + location_address: str + +class InspectionFailedEvent(BaseModel): + event_type: EventType = Field(default=EventType.INSPECTION_FAILED) + item: str + inspection_date: datetime + document_url: str + document_name: str + +class InspectionPassedEvent(BaseModel): + event_type: EventType = Field(default=EventType.INSPECTION_PASSED) + item: str + inspection_date: datetime + document_url: str + document_name: str \ No newline at end of file diff --git a/examples/demos/procurement_agent/project/run_worker.py b/examples/demos/procurement_agent/project/run_worker.py new file mode 100644 index 00000000..127a810f --- /dev/null +++ b/examples/demos/procurement_agent/project/run_worker.py @@ -0,0 +1,96 @@ +import asyncio + +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from project.workflow import ProcurementAgentWorkflow +from project.data.database import init_database +from agentex.lib.utils.debug import setup_debug_if_enabled +from agentex.lib.utils.logging import make_logger +from project.activities.activities import ( + schedule_inspection, + flag_potential_issue, + issue_purchase_order, + remove_delivery_item, + update_project_end_date, + notify_team_shipment_arrived, + update_delivery_date_for_item, + create_procurement_item_activity, + delete_procurement_item_activity, + get_master_construction_schedule, + update_procurement_item_activity, + get_all_procurement_items_activity, + create_master_construction_schedule, + get_procurement_item_by_name_activity, +) +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker +from agentex.lib.core.temporal.plugins.openai_agents.hooks.activities import stream_lifecycle_content +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor + +environment_variables = EnvironmentVariables.refresh() + +logger = make_logger(__name__) + + +async def main(): + """ + Main worker initialization and execution. + Handles database initialization and worker startup with error handling. + """ + try: + # Setup debug mode if enabled + setup_debug_if_enabled() + + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + # Initialize the database with error handling + try: + await init_database() + logger.info("Database initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize database: {e}") + raise RuntimeError(f"Database initialization failed: {e}") from e + + all_activities = get_all_activities() + [stream_lifecycle_content, issue_purchase_order, flag_potential_issue, notify_team_shipment_arrived, schedule_inspection, + create_master_construction_schedule, get_master_construction_schedule, update_delivery_date_for_item, remove_delivery_item, update_project_end_date, + create_procurement_item_activity, update_procurement_item_activity, delete_procurement_item_activity, + get_procurement_item_by_name_activity, get_all_procurement_items_activity] + + context_interceptor = ContextInterceptor() + streaming_model_provider = TemporalStreamingModelProvider() + + # Create a worker with automatic tracing + worker = AgentexWorker( + task_queue=task_queue_name, + plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)], + interceptors=[context_interceptor], + ) + + logger.info(f"Starting worker on task queue: {task_queue_name}") + + await worker.run( + activities=all_activities, + workflow=ProcurementAgentWorkflow, + ) + + except ValueError as e: + # Configuration error + logger.error(f"Configuration error: {e}") + raise + except RuntimeError as e: + # Database or initialization error + logger.error(f"Initialization error: {e}") + raise + except Exception as e: + # Unexpected error + logger.error(f"Unexpected error in worker: {e}", exc_info=True) + raise + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/demos/procurement_agent/project/scripts/__init__.py b/examples/demos/procurement_agent/project/scripts/__init__.py new file mode 100644 index 00000000..6f84b9de --- /dev/null +++ b/examples/demos/procurement_agent/project/scripts/__init__.py @@ -0,0 +1 @@ +"""Procurement agent scripts module.""" diff --git a/examples/demos/procurement_agent/project/scripts/send_test_events.py b/examples/demos/procurement_agent/project/scripts/send_test_events.py new file mode 100644 index 00000000..e85b75c4 --- /dev/null +++ b/examples/demos/procurement_agent/project/scripts/send_test_events.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python +""" +Simple script to automatically send fake events to the workflow. +Just run this script and it will send a few test events to demonstrate the event handling. +""" + +import os +import sys +import asyncio +from datetime import datetime + +from temporalio.client import Client + +from project.models.events import ( + EventType, + InspectionFailedEvent, + InspectionPassedEvent, + SubmitalApprovalEvent, + ShipmentArrivedSiteEvent, + ShipmentDepartedFactoryEvent, +) +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables + +# Set defaults for local development +os.environ.setdefault("AGENT_NAME", "procurement-agent") +os.environ.setdefault("ACP_URL", "http://localhost:8000") +os.environ.setdefault("WORKFLOW_NAME", "procurement-agent") +os.environ.setdefault("WORKFLOW_TASK_QUEUE", "procurement_agent_queue") +os.environ.setdefault("TEMPORAL_ADDRESS", "localhost:7233") + +logger = make_logger(__name__) +environment_variables = EnvironmentVariables.refresh() + + +async def send_fake_events(workflow_id: str): + """Send a series of fake events to the workflow.""" + + # Connect to Temporal + temporal_url = environment_variables.TEMPORAL_ADDRESS or "localhost:7233" + client = await Client.connect(temporal_url) + + # Get handle to the workflow + handle = client.get_workflow_handle(workflow_id) + + # Define the procurement event flow for Steel Beams (passes inspection) + # Required by: 2026-02-15, Buffer: 5 days + # Arriving on 2026-02-10 (5 days early - within buffer) + steel_beams_events = [ + SubmitalApprovalEvent( + event_type=EventType.SUBMITTAL_APPROVED, + item="Steel Beams", + document_name="Steel Beams Submittal.pdf", + document_url="/submittal_approval.pdf" + ), + ShipmentDepartedFactoryEvent( + event_type=EventType.SHIPMENT_DEPARTED_FACTORY, + item="Steel Beams", + eta=datetime(2026, 2, 10, 14, 30), + date_departed=datetime(2026, 2, 3, 9, 15), + location_address="218 W 18th St, New York, NY 10011" + ), + ShipmentArrivedSiteEvent( + event_type=EventType.SHIPMENT_ARRIVED_SITE, + item="Steel Beams", + date_arrived=datetime(2026, 2, 10, 15, 45), + location_address="650 Townsend St, San Francisco, CA 94103" + ), + InspectionPassedEvent( + event_type=EventType.INSPECTION_PASSED, + item="Steel Beams", + inspection_date=datetime(2026, 2, 11, 10, 20), + document_name="Steel Beams Inspection Report.pdf", + document_url="/inspection_passed.pdf" + ) + ] + + # Define the procurement event flow for HVAC Units (fails inspection) + # Required by: 2026-03-01, Buffer: 7 days + # Arriving on 2026-02-22 (7 days early - within buffer) + hvac_events = [ + SubmitalApprovalEvent( + event_type=EventType.SUBMITTAL_APPROVED, + item="HVAC Units", + document_name="HVAC Units Submittal.pdf", + document_url="/submittal_approval.pdf" + ), + ShipmentDepartedFactoryEvent( + event_type=EventType.SHIPMENT_DEPARTED_FACTORY, + item="HVAC Units", + eta=datetime(2026, 2, 22, 11, 0), + date_departed=datetime(2026, 2, 15, 13, 45), + location_address="218 W 18th St, New York, NY 10011" + ), + ShipmentArrivedSiteEvent( + event_type=EventType.SHIPMENT_ARRIVED_SITE, + item="HVAC Units", + date_arrived=datetime(2026, 2, 22, 10, 30), + location_address="650 Townsend St, San Francisco, CA 94103" + ), + InspectionFailedEvent( + event_type=EventType.INSPECTION_FAILED, + item="HVAC Units", + inspection_date=datetime(2026, 2, 23, 14, 15), + document_name="HVAC Units Inspection Report.pdf", + document_url="/inspection_failed.pdf" + ) + ] + + # Define the procurement event flow for Windows (passes inspection - everything smooth) + # Required by: 2026-03-15, Buffer: 10 days + # Arriving on 2026-03-05 (10 days early - within buffer) + windows_events = [ + SubmitalApprovalEvent( + event_type=EventType.SUBMITTAL_APPROVED, + item="Windows", + document_name="Windows Submittal.pdf", + document_url="/submittal_approval.pdf" + ), + ShipmentDepartedFactoryEvent( + event_type=EventType.SHIPMENT_DEPARTED_FACTORY, + item="Windows", + eta=datetime(2026, 3, 5, 16, 0), + date_departed=datetime(2026, 2, 20, 8, 30), + location_address="218 W 18th St, New York, NY 10011" + ), + ShipmentArrivedSiteEvent( + event_type=EventType.SHIPMENT_ARRIVED_SITE, + item="Windows", + date_arrived=datetime(2026, 3, 5, 16, 20), + location_address="650 Townsend St, San Francisco, CA 94103" + ), + InspectionPassedEvent( + event_type=EventType.INSPECTION_PASSED, + item="Windows", + inspection_date=datetime(2026, 3, 6, 9, 45), + document_name="Windows Inspection Report.pdf", + document_url="/inspection_passed.pdf" + ), + # Duplicate arrival event to test agent doesn't double-process + ShipmentArrivedSiteEvent( + event_type=EventType.SHIPMENT_ARRIVED_SITE, + item="Windows", + date_arrived=datetime(2026, 3, 5, 16, 20), + location_address="650 Townsend St, San Francisco, CA 94103" + ) + ] + + # Define the procurement event flow for Flooring Materials (passes inspection - everything smooth) + # Required by: 2026-04-01, Buffer: 3 days + # Arriving on 2026-03-29 (3 days early - within buffer) + flooring_events = [ + SubmitalApprovalEvent( + event_type=EventType.SUBMITTAL_APPROVED, + item="Flooring Materials", + document_name="Flooring Materials Submittal.pdf", + document_url="/submittal_approval.pdf" + ), + ShipmentDepartedFactoryEvent( + event_type=EventType.SHIPMENT_DEPARTED_FACTORY, + item="Flooring Materials", + eta=datetime(2026, 3, 29, 13, 15), + date_departed=datetime(2026, 3, 22, 11, 30), + location_address="218 W 18th St, New York, NY 10011" + ), + ShipmentArrivedSiteEvent( + event_type=EventType.SHIPMENT_ARRIVED_SITE, + item="Flooring Materials", + date_arrived=datetime(2026, 3, 29, 12, 45), + location_address="650 Townsend St, San Francisco, CA 94103" + ), + InspectionPassedEvent( + event_type=EventType.INSPECTION_PASSED, + item="Flooring Materials", + inspection_date=datetime(2026, 3, 30, 15, 30), + document_name="Flooring Materials Inspection Report.pdf", + document_url="/inspection_passed.pdf" + ) + ] + + # Define the procurement event flow for Electrical Panels (fails inspection) + # Required by: 2026-04-15, Buffer: 5 days + # Arriving on 2026-04-10 (5 days early - within buffer) + # Agent should apply learnings from HVAC Units failure + electrical_events = [ + SubmitalApprovalEvent( + event_type=EventType.SUBMITTAL_APPROVED, + item="Electrical Panels", + document_name="Electrical Panels Submittal.pdf", + document_url="/submittal_approval.pdf" + ), + ShipmentDepartedFactoryEvent( + event_type=EventType.SHIPMENT_DEPARTED_FACTORY, + item="Electrical Panels", + eta=datetime(2026, 4, 10, 10, 45), + date_departed=datetime(2026, 4, 1, 14, 0), + location_address="218 W 18th St, New York, NY 10011" + ), + ShipmentArrivedSiteEvent( + event_type=EventType.SHIPMENT_ARRIVED_SITE, + item="Electrical Panels", + date_arrived=datetime(2026, 4, 10, 11, 15), + location_address="650 Townsend St, San Francisco, CA 94103" + ), + InspectionFailedEvent( + event_type=EventType.INSPECTION_FAILED, + item="Electrical Panels", + inspection_date=datetime(2026, 4, 11, 13, 0), + document_name="Electrical Panels Inspection Report.pdf", + document_url="/inspection_failed.pdf" + ) + ] + + # Combine all events + all_events = [ + ("Steel Beams", steel_beams_events), + ("HVAC Units", hvac_events), + ("Windows", windows_events), + ("Flooring Materials", flooring_events), + ("Electrical Panels", electrical_events) + ] + + print(f"Connected to workflow: {workflow_id}") + print("=" * 60) + print("Sending procurement events...") + print("=" * 60) + + for item_name, events in all_events: + print(f"\n{'=' * 60}") + print(f"Processing: {item_name}") + print("=" * 60) + + for i, event in enumerate(events, 1): + print(f"\n[Event {i}] Sending: {event.event_type.value}") + print(f" Item: {event.item}") + + # Show additional details based on event type + if hasattr(event, 'eta'): + print(f" ETA: {event.eta}") + if hasattr(event, 'date_arrived'): + print(f" Date Arrived: {event.date_arrived}") + if hasattr(event, 'inspection_date'): + print(f" Inspection Date: {event.inspection_date}") + + try: + # Send the event using the send_event signal + # Convert event to JSON string + event_data = event.model_dump_json() + await handle.signal("send_event", event_data) + print(f"✓ Event sent successfully!") + + # Wait a bit between events so you can see them being processed + await asyncio.sleep(10) + + except Exception as e: + print(f"✗ Error sending event: {e}") + logger.error(f"Failed to send event: {e}") + + print("\n" + "=" * 60) + print("All events have been sent!") + print("Check your workflow in the UI to see the processed events.") + print("=" * 60) + + +async def main(): + """Main entry point.""" + + # Get workflow ID from command line or prompt user + if len(sys.argv) > 1: + workflow_id = sys.argv[1] + else: + print("Enter the Workflow ID to send events to:") + print("(You can find this in the AgentEx UI or Temporal dashboard)") + workflow_id = input("Workflow ID: ").strip() + + if not workflow_id: + print("Error: Workflow ID is required!") + print("\nUsage: python send_simple_events.py [workflow_id]") + return + + try: + await send_fake_events(workflow_id) + except KeyboardInterrupt: + print("\n\nInterrupted. Goodbye!") + except Exception as e: + logger.error(f"Unexpected error: {e}") + print(f"Error: {e}") + print("\nMake sure:") + print("1. The workflow is running") + print("2. The workflow ID is correct") + print("3. Temporal is accessible at", environment_variables.TEMPORAL_ADDRESS) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/demos/procurement_agent/project/utils/__init__.py b/examples/demos/procurement_agent/project/utils/__init__.py new file mode 100644 index 00000000..be8d6ac8 --- /dev/null +++ b/examples/demos/procurement_agent/project/utils/__init__.py @@ -0,0 +1,5 @@ +"""Utility functions for the procurement agent.""" + +from project.utils.learning_extraction import get_new_wait_for_human_context + +__all__ = ["get_new_wait_for_human_context"] diff --git a/examples/demos/procurement_agent/project/utils/learning_extraction.py b/examples/demos/procurement_agent/project/utils/learning_extraction.py new file mode 100644 index 00000000..e6cb61b3 --- /dev/null +++ b/examples/demos/procurement_agent/project/utils/learning_extraction.py @@ -0,0 +1,69 @@ +"""Utility for extracting new context from human interactions using a "going backwards" approach. + +This module prevents re-processing old wait_for_human calls by: +1. Iterating backwards through the conversation +2. Stopping when we hit a previously-processed wait_for_human call +3. Returning only the NEW portion of the conversation +""" + +from typing import Any, Set, Dict, List, Tuple, Optional + +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + + +def get_new_wait_for_human_context( + full_conversation: List[Dict[str, Any]], + extracted_learning_call_ids: Set[str], +) -> Optional[Tuple[List[Dict[str, Any]], str]]: + """ + Extract NEW context since the last processed wait_for_human call. + + Similar to OpenCode's filterCompacted() pattern, this function: + - Iterates backwards through the full conversation history + - Stops when it finds a wait_for_human call we've already processed + - Returns only the NEW context + + Args: + full_conversation: The complete conversation history (self._state.input_list) + extracted_learning_call_ids: Set of call_ids we've already extracted learnings from + + Returns: + Tuple of (new_context_messages, call_id) if a new wait_for_human was found, None otherwise + """ + # Go backwards through the conversation to find new wait_for_human calls + new_context = [] + found_new_wait_for_human = False + new_wait_for_human_call_id = None + + for item in reversed(full_conversation): + # Always collect items as we go backwards + new_context.append(item) + + # Check if this is a wait_for_human function call + if isinstance(item, dict) and item.get("type") == "function_call": + if item.get("name") == "wait_for_human": + call_id = item.get("call_id") + + # If we've already extracted learning for this call_id, STOP + if call_id in extracted_learning_call_ids: + logger.info(f"Found already-processed wait_for_human call_id: {call_id}, stopping") + break + + # This is a NEW wait_for_human call + if not found_new_wait_for_human: + found_new_wait_for_human = True + new_wait_for_human_call_id = call_id + logger.info(f"Found NEW wait_for_human call_id: {call_id}") + + # If we found a new wait_for_human call, return the new context + if found_new_wait_for_human: + # Reverse back to chronological order + new_context.reverse() + logger.info(f"Extracted {len(new_context)} messages of new context") + assert new_wait_for_human_call_id is not None, "call_id should be set when found_new_wait_for_human is True" + return (new_context, new_wait_for_human_call_id) + else: + logger.info("No new wait_for_human calls found") + return None diff --git a/examples/demos/procurement_agent/project/utils/summarization.py b/examples/demos/procurement_agent/project/utils/summarization.py new file mode 100644 index 00000000..b74ad1e3 --- /dev/null +++ b/examples/demos/procurement_agent/project/utils/summarization.py @@ -0,0 +1,205 @@ +""" +Summarization utility for managing conversation context. + +This module provides functionality to detect when conversation history exceeds +token limits and should be summarized. Follows OpenCode's approach of stopping +at previous summaries to avoid re-summarizing already condensed content. +""" +from typing import Any, Dict, List, Tuple, Optional + +import tiktoken + +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + +# Configuration constants +SUMMARIZATION_TOKEN_THRESHOLD = 40000 # Trigger summarization at 40k tokens +PRESERVE_LAST_N_TURNS = 10 # Always keep last 10 user turns in full + + +def estimate_tokens(text: str) -> int: + """ + Estimate the number of tokens in a text string using tiktoken. + + Args: + text: The text to estimate tokens for + + Returns: + Estimated token count + """ + try: + encoding = tiktoken.encoding_for_model("gpt-4o") + return len(encoding.encode(text)) + except Exception as e: + # Fallback to rough estimation if tiktoken fails + logger.warning(f"Token estimation failed, using fallback: {e}") + return len(text) // 4 # Rough approximation + + +def should_summarize(input_list: List[Dict[str, Any]]) -> bool: + """ + Check if the conversation history exceeds the token threshold and needs summarization. + + Args: + input_list: The conversation history + + Returns: + True if summarization should be triggered + """ + total_tokens = 0 + + for item in input_list: + if isinstance(item, dict): + # Estimate tokens for the entire item (JSON serialized) + item_str = str(item) + total_tokens += estimate_tokens(item_str) + + logger.info(f"Total conversation tokens: {total_tokens}") + + if total_tokens > SUMMARIZATION_TOKEN_THRESHOLD: + logger.info(f"Token threshold exceeded ({total_tokens} > {SUMMARIZATION_TOKEN_THRESHOLD}), summarization needed") + return True + + return False + + +def get_messages_to_summarize( + input_list: List[Dict[str, Any]], + last_summary_index: Optional[int] +) -> Tuple[List[Dict[str, Any]], int, int]: + """ + Get the portion of conversation that should be summarized, following OpenCode's approach. + + Strategy: + - If there's a previous summary, start from AFTER it (never re-summarize summaries) + - Find last N user turns and preserve them + - Return everything in between for summarization + + Args: + input_list: The full conversation history + last_summary_index: Index of the last summary message (None if no prior summary) + + Returns: + Tuple of (messages_to_summarize, start_index, end_index) + - messages_to_summarize: The slice of conversation to summarize + - start_index: Where the summarization range starts + - end_index: Where the summarization range ends (exclusive) + """ + # Find all user turn indices + user_turn_indices = [] + for i, item in enumerate(input_list): + if isinstance(item, dict) and item.get("role") == "user": + user_turn_indices.append(i) + + # Determine the start index (after last summary, or from beginning) + if last_summary_index is not None: + start_index = last_summary_index + 1 # Start AFTER the summary + logger.info(f"Starting summarization after previous summary at index {last_summary_index}") + else: + start_index = 0 + logger.info("No previous summary found, starting from beginning") + + # Determine the end index (preserve last N turns) + if len(user_turn_indices) >= PRESERVE_LAST_N_TURNS: + # Find the Nth-from-last user turn + preserve_from_index = user_turn_indices[-PRESERVE_LAST_N_TURNS] + end_index = preserve_from_index + logger.info(f"Preserving last {PRESERVE_LAST_N_TURNS} turns from index {preserve_from_index}") + else: + # Not enough turns to preserve, summarize nothing + end_index = len(input_list) + logger.warning(f"Only {len(user_turn_indices)} user turns, not enough to summarize (need more than {PRESERVE_LAST_N_TURNS})") + + # Extract the messages to summarize + if end_index <= start_index: + logger.info("No messages to summarize (end_index <= start_index)") + return [], start_index, end_index + + messages_to_summarize = input_list[start_index:end_index] + logger.info(f"Summarizing {len(messages_to_summarize)} messages from index {start_index} to {end_index}") + + return messages_to_summarize, start_index, end_index + + +def create_summary_message(summary_text: str) -> Dict[str, Any]: + """ + Create a summary message in the input_list format. + + Args: + summary_text: The AI-generated summary text + + Returns: + A dictionary representing the summary message + """ + return { + "role": "assistant", + "content": summary_text, + "_summary": True, # Mark this as a summary message + } + + +def create_resume_message() -> Dict[str, Any]: + """ + Create a resume message that instructs the AI to continue from the summary. + + Returns: + A dictionary representing the resume instruction + """ + return { + "role": "user", + "content": "Use the above summary to continue from where we left off.", + "_synthetic": True, # Mark as system-generated + } + + +def apply_summary_to_input_list( + input_list: List[Dict[str, Any]], + summary_text: str, + start_index: int, + end_index: int +) -> List[Dict[str, Any]]: + """ + Replace the summarized portion of input_list with the summary message. + + Args: + input_list: The original conversation history + summary_text: The AI-generated summary + start_index: Start of summarized range + end_index: End of summarized range + + Returns: + New input_list with summary applied + """ + # Build new input list: [before summary] + [summary] + [resume] + [after summary] + before_summary = input_list[:start_index] if start_index > 0 else [] + after_summary = input_list[end_index:] + + summary_msg = create_summary_message(summary_text) + resume_msg = create_resume_message() + + new_input_list = before_summary + [summary_msg, resume_msg] + after_summary + + logger.info(f"Applied summary: reduced from {len(input_list)} to {len(new_input_list)} messages") + + return new_input_list + + +def find_last_summary_index(input_list: List[Dict[str, Any]]) -> Optional[int]: + """ + Find the index of the last summary message in the conversation. + + Args: + input_list: The conversation history + + Returns: + Index of the last summary message, or None if no summary exists + """ + for i in range(len(input_list) - 1, -1, -1): + item = input_list[i] + if isinstance(item, dict) and item.get("_summary") is True: + logger.info(f"Found last summary at index {i}") + return i + + logger.info("No previous summary found") + return None diff --git a/examples/demos/procurement_agent/project/workflow.py b/examples/demos/procurement_agent/project/workflow.py new file mode 100644 index 00000000..30b94b4b --- /dev/null +++ b/examples/demos/procurement_agent/project/workflow.py @@ -0,0 +1,403 @@ +import json +import asyncio +from typing import Any, Dict, List, override +from datetime import timedelta + +from agents import Runner +from pydantic import BaseModel +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.exceptions import ApplicationError + +from agentex.lib import adk +from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from project.models.events import ( + EventType, + InspectionFailedEvent, + InspectionPassedEvent, + SubmitalApprovalEvent, + ShipmentArrivedSiteEvent, + ShipmentDepartedFactoryEvent, +) +from agentex.lib.utils.logging import make_logger +from agentex.types.data_content import DataContent +from agentex.types.text_content import TextContent +from project.utils.summarization import ( + should_summarize, + find_last_summary_index, + get_messages_to_summarize, + apply_summary_to_input_list, +) +from project.activities.activities import get_master_construction_schedule, create_master_construction_schedule +from project.agents.procurement_agent import new_procurement_agent +from agentex.lib.environment_variables import EnvironmentVariables +from project.utils.learning_extraction import get_new_wait_for_human_context +from project.agents.summarization_agent import new_summarization_agent +from project.agents.extract_learnings_agent import new_extract_learnings_agent +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") + +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + +class StateModel(BaseModel): + """ + State model for preserving conversation history. + + This allows the agent to maintain context throughout the conversation, + making it possible to reference previous messages and build on the discussion. + """ + input_list: List[Dict[str, Any]] + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class ProcurementAgentWorkflow(BaseWorkflow): + """ + Minimal async workflow template for AgentEx Temporal agents. + """ + def __init__(self): + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._task_id = None + self._trace_id = None + self._parent_span_id = None + self._state = None + self._workflow_started = False # Track if agent workflow loop has started + self.event_queue: asyncio.Queue = asyncio.Queue() # Events + self.human_queue: asyncio.Queue = asyncio.Queue() # Human input + self.human_input_learnings: list = [] + self.extracted_learning_call_ids: set = set() # Track which wait_for_human calls we've extracted learnings from + + # Define activity retry policy with exponential backoff + # Based on Temporal best practices from blog post + self.activity_retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, # Exponential backoff + maximum_interval=timedelta(seconds=120), # Cap at 2 minutes + maximum_attempts=5, + non_retryable_error_types=[ + "DataCorruptionError", + "ScheduleNotFoundError", + ] + ) + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + @override + async def on_task_event_send(self, params: SendEventParams) -> None: + """ + Handle incoming events from the frontend. + + First event: Triggers the initial agent workflow execution. + Subsequent events: Feed the wait_for_human tool's human_queue. + """ + if self._state is None: + raise ValueError("State is not initialized") + + if params.event.content is None: + workflow.logger.warning("Received event with no content") + return + + # Display the user's message in the UI + await adk.messages.create(task_id=params.task.id, content=params.event.content) + + # After the first event, all subsequent events are human responses to wait_for_human + if self._workflow_started: + # Extract text content and put it in the human_queue for wait_for_human tool + if isinstance(params.event.content, TextContent): + await self.human_queue.put(params.event.content.content) + + @workflow.run + @override + async def on_task_create(self, params: CreateTaskParams) -> str: + logger.info(f"Received task create params: {params}") + + self._state = StateModel(input_list=[]) + + self._task_id = params.task.id + self._trace_id = params.task.id + self._parent_span_id = params.task.id + + workflow_id = workflow.info().workflow_id + + # Create the master construction schedule with error handling + try: + await workflow.execute_activity( + create_master_construction_schedule, + workflow_id, + start_to_close_timeout=timedelta(minutes=5), # Changed from 10s to 5min + schedule_to_close_timeout=timedelta(minutes=10), + retry_policy=self.activity_retry_policy, + ) + logger.info("Master construction schedule created successfully") + + except ApplicationError as e: + # Non-retryable application error (invalid data) + logger.error(f"Failed to create schedule: {e}") + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content="Failed to initialize project schedule. Please contact support.", + ), + ) + raise # Fail the workflow + + except Exception as e: + # Unexpected error + logger.error(f"Unexpected error creating schedule: {e}") + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content="System error during initialization. Please try creating a new task.", + ), + ) + raise + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content="Welcome to the Procurement Agent! I'll help you manage construction deliveries and schedules. Send events to get started.", + ), + ) + + # Mark workflow as started - subsequent events will feed the human_queue + self._workflow_started = True + + while True: + await workflow.wait_condition( + lambda: not self.event_queue.empty(), + timeout=None, + ) + + if not self.event_queue.empty(): + event = await self.event_queue.get() + + await adk.messages.create(task_id=params.task.id, content=DataContent( + author="user", + data=json.loads(event), + )) + + self._state.input_list.append({ + "role": "user", + "content": event, + }) + + # Get master construction schedule with error handling + try: + master_construction_schedule = await workflow.execute_activity( + get_master_construction_schedule, + workflow_id, + start_to_close_timeout=timedelta(minutes=2), # Changed from 10s to 2min + schedule_to_close_timeout=timedelta(minutes=5), + retry_policy=self.activity_retry_policy, + ) + except ApplicationError as e: + # Non-retryable error (schedule not found or corrupted) + logger.error(f"Failed to retrieve schedule for event processing: {e}") + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content="Unable to access project schedule. Please reinitialize the workflow.", + ), + ) + continue # Skip this event, wait for next one + + except Exception as e: + # Unexpected error retrieving schedule + logger.error(f"Unexpected error retrieving schedule: {e}") + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content="Temporary system issue. Retrying event processing...", + ), + ) + continue # Skip this event, wait for next one + + # Create agent and execute with error handling + try: + procurement_agent = new_procurement_agent( + master_construction_schedule=master_construction_schedule, + human_input_learnings=self.human_input_learnings + ) + + hooks = TemporalStreamingHooks(task_id=params.task.id) + + # Execute agent with graceful degradation pattern (from temporal-community demos) + result = await Runner.run(procurement_agent, self._state.input_list, hooks=hooks) # type: ignore[arg-type] + + # Update state with result + self._state.input_list = result.to_input_list() # type: ignore[assignment] + logger.info("Successfully processed event") + # Extract learnings from NEW wait_for_human calls only (using going backwards approach) + try: + result_context = get_new_wait_for_human_context( + full_conversation=self._state.input_list, + extracted_learning_call_ids=self.extracted_learning_call_ids, + ) + + if result_context is not None: + new_context, call_id = result_context + logger.info("Found new wait_for_human call, extracting learning...") + + # Create extraction agent and run with only the NEW context + extract_agent = new_extract_learnings_agent() + extraction_result = await Runner.run(extract_agent, new_context, hooks=hooks) # type: ignore[arg-type] + + logger.info(f"About to extract learning: {extraction_result.final_output}") + # Append the learning and track the call_id + learning = extraction_result.final_output + if learning: + self.human_input_learnings.append(learning) + self.extracted_learning_call_ids.add(call_id) + logger.info(f"Extracted learning: {learning}") + + except Exception as e: + logger.error(f"Failed to extract learning: {e}") + + # Check if summarization is needed (after learning extraction) + try: + if should_summarize(self._state.input_list): + logger.info("Token threshold exceeded, starting summarization...") + + # Find the last summary index + last_summary_index = find_last_summary_index(self._state.input_list) + + # Get messages to summarize (excludes last 10 turns, starts after previous summary) + messages_to_summarize, start_index, end_index = get_messages_to_summarize( + self._state.input_list, + last_summary_index + ) + + if messages_to_summarize: + logger.info(f"Summarizing {len(messages_to_summarize)} messages...") + + # Create summarization agent and run + summary_agent = new_summarization_agent() + summary_result = await Runner.run(summary_agent, messages_to_summarize, hooks=hooks) # type: ignore[arg-type] + + summary_text = summary_result.final_output + if summary_text: + # Apply summary to input_list + self._state.input_list = apply_summary_to_input_list( + self._state.input_list, + summary_text, + start_index, + end_index + ) + logger.info(f"Summarization complete, new input_list length: {len(self._state.input_list)}") + else: + logger.warning("Summarization produced no output") + else: + logger.info("No messages to summarize (not enough turns yet)") + + except Exception as e: + logger.error(f"Failed to summarize conversation: {e}") + + except Exception as e: + # Agent execution failed - graceful degradation + logger.error(f"Agent execution failed processing event: {e}") + + # Notify that event couldn't be processed + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content="Unable to process this event. The issue has been logged. Please try sending another event.", + ), + ) + + # Don't crash workflow - continue and wait for next event + continue + + if self._complete_task: + return "Task completed" + + @workflow.signal + async def complete_task_signal(self) -> None: + logger.info("Received signal to complete the agent conversation") + self._complete_task = True + + @workflow.signal + async def send_event(self, event: str) -> None: + """ + Receives event strings from external systems with validation. + Events should be JSON strings with event_type and required fields. + Example: {"event_type":"Submittal_Approved","item":"Steel Beams"} + """ + # Validate event is not None or empty + if not event: + logger.error("Received empty or None event") + raise ValueError("Event cannot be empty or None") + + # Validate event is a string + if not isinstance(event, str): + logger.error(f"Event must be string, got {type(event)}") + raise ValueError(f"Event must be a string, received {type(event).__name__}") + + # Validate event length (prevent DoS) + if len(event) > 50000: # 50KB limit + logger.error(f"Event too large: {len(event)} characters") + raise ValueError(f"Event exceeds maximum size (50KB)") + + # Validate event is valid JSON + try: + event_data = json.loads(event) + except json.JSONDecodeError as e: + logger.error(f"Event is not valid JSON: {e}") + raise ValueError(f"Event must be valid JSON: {e}") from e + + # Validate event has required structure + if not isinstance(event_data, dict): + logger.error(f"Event JSON must be an object, got {type(event_data)}") + raise ValueError("Event must be a JSON object") + + # Validate event_type field exists + if "event_type" not in event_data: + logger.error("Event missing 'event_type' field") + raise ValueError("Event must contain 'event_type' field") + + # Validate event_type is one of the allowed types + event_type_str = event_data["event_type"] + valid_event_types = [e.value for e in EventType] + + if event_type_str not in valid_event_types: + logger.error(f"Invalid event_type: {event_type_str}. Valid types: {valid_event_types}") + raise ValueError( + f"Invalid event_type '{event_type_str}'. " + f"Must be one of: {', '.join(valid_event_types)}" + ) + + # Validate event structure based on type using Pydantic models + try: + if event_type_str == EventType.SUBMITTAL_APPROVED.value: + SubmitalApprovalEvent(**event_data) + elif event_type_str == EventType.SHIPMENT_DEPARTED_FACTORY.value: + ShipmentDepartedFactoryEvent(**event_data) + elif event_type_str == EventType.SHIPMENT_ARRIVED_SITE.value: + ShipmentArrivedSiteEvent(**event_data) + elif event_type_str == EventType.INSPECTION_FAILED.value: + InspectionFailedEvent(**event_data) + elif event_type_str == EventType.INSPECTION_PASSED.value: + InspectionPassedEvent(**event_data) + elif event_type_str == EventType.HUMAN_INPUT.value: + # HUMAN_INPUT doesn't have a specific model, just needs event_type + pass + + except Exception as e: + logger.error(f"Event validation failed for {event_type_str}: {e}") + raise ValueError(f"Invalid event structure for {event_type_str}: {e}") from e + + logger.info(f"Validated event type: {event_type_str}") + await self.event_queue.put(event) \ No newline at end of file diff --git a/examples/demos/procurement_agent/pyproject.toml b/examples/demos/procurement_agent/pyproject.toml new file mode 100644 index 00000000..7ccbf80e --- /dev/null +++ b/examples/demos/procurement_agent/pyproject.toml @@ -0,0 +1,36 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "procurement_agent" +version = "0.1.0" +description = "An Agentex agent that manages procurement for building constructions" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk>=0.6.5", + "openai-agents>=0.4.2", + "temporalio>=1.18.2", + "scale-gp", + "aiosqlite", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 \ No newline at end of file diff --git a/uv.lock b/uv.lock index 82183068..379f3c63 100644 --- a/uv.lock +++ b/uv.lock @@ -8,7 +8,7 @@ resolution-markers = [ [[package]] name = "agentex-sdk" -version = "0.6.2" +version = "0.6.5" source = { editable = "." } dependencies = [ { name = "aiohttp" },