Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
__pycache__/
.env
.venv/
.venv/
cua_doc.pdf
71 changes: 56 additions & 15 deletions agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,67 @@ def run_full_turn(
self.print_steps = print_steps
self.debug = debug
self.show_images = show_images
new_items = []

# keep looping until we get a final response
while new_items[-1].get("role") != "assistant" if new_items else True:
self.debug_print([sanitize_message(msg) for msg in input_items + new_items])

response = create_response(
transcript: list[dict] = [] # keep for debugging/return
pending: list[dict] = input_items.copy() # start with user/system messages
previous_response_id = None

while True:
self.debug_print([sanitize_message(msg) for msg in transcript + pending])

# Deduplicate by id within this batch (API rejects duplicates).
seen_ids: set[str] = set()
payload: list[dict] = []
for m in pending:
mid = m.get("id") if isinstance(m, dict) else None
if mid and mid in seen_ids:
continue
if mid:
seen_ids.add(mid)
payload.append(m)

req = dict(
model=self.model,
input=input_items + new_items,
input=payload,
tools=self.tools,
truncation="auto",
)
if previous_response_id:
req["previous_response_id"] = previous_response_id

response = create_response(**req)
self.debug_print(response)

if "output" not in response and self.debug:
print(response)
previous_response_id = response.get("id")

if "output" not in response:
raise ValueError("No output from model")
else:
new_items += response["output"]
for item in response["output"]:
new_items += self.handle_item(item)

return new_items
# prepare for next loop
new_pending: list[dict] = []

for item in response["output"]:
t_type = item.get("type")

if t_type == "computer_call":
# Execute call and collect observations to send next.
obs = self.handle_item(item)
transcript.append(item) # keep for local log
transcript.extend(obs)
new_pending.extend(obs)
# After a computer_call we immediately break to send obs.
break
else:
transcript.append(item)
# assistant/user messages do not get re‑sent; rely on previous_response_id

# Determine loop exit: if last transcript item is assistant with no further tool calls
if new_pending and new_pending[-1].get("role") == "assistant":
break

pending = new_pending if new_pending else []

# If there's nothing new to send (shouldn't happen), exit to avoid infinite loop
if not pending:
break

return transcript
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ scrapybara>=2.3.6
sniffio==1.3.1
typing_extensions==4.12.2
urllib3==2.3.0
openai==1.75.0
47 changes: 38 additions & 9 deletions simple_cua_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,31 +65,60 @@ def main():
}
]

items = []
transcript: list[dict] = []
while True: # get user input forever
user_input = input("> ")
items.append({"role": "user", "content": user_input})

while True: # keep looping until we get a final response
response = create_response(
pending: list[dict] = [{"role": "user", "content": user_input}]
previous_response_id = None

while True:
seen_ids: set[str] = set()
payload: list[dict] = []
for m in pending:
mid = m.get("id") if isinstance(m, dict) else None
if mid and mid in seen_ids:
continue
if mid:
seen_ids.add(mid)
payload.append(m)

req = dict(
model="computer-use-preview",
input=items,
input=payload,
tools=tools,
truncation="auto",
)
if previous_response_id:
req["previous_response_id"] = previous_response_id

response = create_response(**req)

if "output" not in response:
print(response)
raise ValueError("No output from model")

items += response["output"]
previous_response_id = response.get("id")

new_pending: list[dict] = []

for item in response["output"]:
items += handle_item(item, computer)
if item.get("type") == "computer_call":
obs = handle_item(item, computer)
transcript.append(item)
transcript.extend(obs)
new_pending.extend(obs)
break
else:
transcript.append(item)

if new_pending and new_pending[-1].get("role") == "assistant":
break

if items[-1].get("role") == "assistant":
if not new_pending:
break

pending = new_pending


if __name__ == "__main__":
main()
69 changes: 47 additions & 22 deletions utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
from __future__ import annotations

import os
import requests
from dotenv import load_dotenv
import json
import base64
from PIL import Image
from io import BytesIO
import io
from io import BytesIO
from typing import Any, Dict
from urllib.parse import urlparse

load_dotenv(override=True)
from dotenv import load_dotenv
from PIL import Image

import openai


openai.api_key = os.getenv("OPENAI_API_KEY", openai.api_key)
if org := os.getenv("OPENAI_ORG"):
openai.organization = org

BLOCKED_DOMAINS = [
"maliciousbook.com",
Expand Down Expand Up @@ -47,23 +55,40 @@ def sanitize_message(msg: dict) -> dict:
return msg


def create_response(**kwargs):
url = "https://api.openai.com/v1/responses"
headers = {
"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
"Content-Type": "application/json"
}

openai_org = os.getenv("OPENAI_ORG")
if openai_org:
headers["Openai-Organization"] = openai_org

response = requests.post(url, headers=headers, json=kwargs)

if response.status_code != 200:
print(f"Error: {response.status_code} {response.text}")

return response.json()
def create_response(**kwargs: Any) -> Dict[str, Any]:
"""Wrapper around *openai.responses.create* with dict output.

The original implementation issued a raw POST and returned ``response.json()``.
Most call‑sites therefore expect a *plain dict* with keys like ``"output"``
and ``"id"``. The OpenAI SDK, however, returns a *pydantic* model.

To stay backward‑compatible we convert the SDK object back to ``dict`` via
its ``to_dict()``/``model_dump()`` helper before returning. This way none
of the downstream code needs to change its access pattern.
"""

# The callers may or may not supply ``previous_response_id``. The SDK
# requires ``openai.NOT_GIVEN`` for omitted optional parameters instead of
# plain ``None``. Convert to the sentinel so that accidental "None"
# doesn't override the continuation logic.
if "previous_response_id" in kwargs and kwargs["previous_response_id"] is None:
kwargs["previous_response_id"] = openai.NOT_GIVEN

try:
sdk_response = openai.responses.create(**kwargs)
except openai.OpenAIError as e: # pragma: no cover – network/credential errors only at runtime.
# Mirror the old behaviour of printing status & body for debugging.
print("Error while calling Responses API:", e)
raise

# Convert to dict for compatibility.
if hasattr(sdk_response, "to_dict"):
return sdk_response.to_dict()
# Fallback – older versions expose ``model_dump``.
if hasattr(sdk_response, "model_dump"):
return sdk_response.model_dump()
# As a last resort, expose the underlying ``__dict__``.
return dict(sdk_response.__dict__)


def check_blocklisted_url(url: str) -> None:
Expand Down