From 4441d373dccb68c81c0562d0f6f4c8970eefce52 Mon Sep 17 00:00:00 2001 From: Erben Mo Date: Tue, 4 Nov 2025 16:46:29 -0800 Subject: [PATCH 1/5] Basic implementation of ACP agent interface using Amazon Q CLI agent The following features are supported: - New Session setup ("session/new") - Basic chat ("session/prompt" & "session/update") - Using built-in tool like fs_read and fs_write ("tool_call") - Request tool call permission ("session/request_permission") We use Symposium ACP as it provides a nicer interface to write non-blocking ACP agent The main method is handle_prompt_request: - submit the request to the agent - poll agent update events, convert them to ACP events, and send them back to ACP client - tell ACP client that the request is completed You can test this implementation using acp_client: cargo run -p agent -- acp-client ./target/debug/agent --- .gitignore | 2 + Cargo.lock | 142 +++++++ crates/agent/Cargo.toml | 2 + crates/agent/src/acp/README.md | 40 ++ crates/agent/src/acp/acp_agent.rs | 361 ++++++++++++++++++ crates/agent/src/acp/acp_client.rs | 178 +++++++++ crates/agent/src/acp/mod.rs | 2 + crates/agent/src/agent/mod.rs | 8 + .../agent/src/agent/util/request_channel.rs | 6 + crates/agent/src/cli/mod.rs | 14 + crates/agent/src/main.rs | 1 + 11 files changed, 756 insertions(+) create mode 100644 crates/agent/src/acp/README.md create mode 100644 crates/agent/src/acp/acp_agent.rs create mode 100644 crates/agent/src/acp/acp_client.rs create mode 100644 crates/agent/src/acp/mod.rs diff --git a/.gitignore b/.gitignore index d0c19d1433..c09f17f6da 100644 --- a/.gitignore +++ b/.gitignore @@ -50,3 +50,5 @@ book/ run-build.sh .amazonq/ + +chat.log diff --git a/Cargo.lock b/Cargo.lock index 73a6f2ea8f..63ae39ee9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,6 +41,7 @@ dependencies = [ name = "agent" version = "1.19.3" dependencies = [ + "agent-client-protocol", "amzn-codewhisperer-client", "amzn-codewhisperer-streaming-client", "amzn-consolas-client", @@ -97,6 +98,7 @@ dependencies = [ "rusqlite", "rustls 0.23.33", "rustls-native-certs 0.8.2", + "sacp", "schemars", "semver", "serde", @@ -123,6 +125,37 @@ dependencies = [ "whoami", ] +[[package]] +name = "agent-client-protocol" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525705e39c11cd73f7bc784e3681a9386aa30c8d0630808d3dc2237eb4f9cb1b" +dependencies = [ + "agent-client-protocol-schema", + "anyhow", + "async-broadcast", + "async-trait", + "derive_more", + "futures", + "log", + "parking_lot", + "serde", + "serde_json", +] + +[[package]] +name = "agent-client-protocol-schema" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d08d095e8069115774caa50392e9c818e3fb1c482ef4f3153d26b4595482f2" +dependencies = [ + "anyhow", + "derive_more", + "schemars", + "serde", + "serde_json", +] + [[package]] name = "ahash" version = "0.8.12" @@ -417,6 +450,18 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "async-broadcast" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.32" @@ -1115,6 +1160,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3eeab4423108c5d7c744f4d234de88d18d636100093ae04caf4825134b9c3a32" +[[package]] +name = "boxfnonce" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5988cb1d626264ac94100be357308f29ff7cbdd3b36bda27f450a4ee3f713426" + [[package]] name = "bs58" version = "0.5.1" @@ -1848,6 +1899,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e47641d3deaf41fb1538ac1f54735925e275eaf3bf4d55c81b137fba797e5cbb" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.11" @@ -2269,6 +2329,27 @@ dependencies = [ "syn 2.0.107", ] +[[package]] +name = "derive_more" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.107", + "unicode-xid", +] + [[package]] name = "deunicode" version = "1.6.2" @@ -2631,6 +2712,27 @@ dependencies = [ "num-traits", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "exr" version = "1.73.0" @@ -3961,6 +4063,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonrpcmsg" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d833a15225c779251e13929203518c2ff26e2fe0f322d584b213f4f4dad37bd" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "jsonschema" version = "0.30.0" @@ -5139,6 +5251,12 @@ version = "4.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c6901729fa79e91a0913333229e9ca5dc725089d1c363b2f4b4760709dc4a52" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -6335,6 +6453,24 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "sacp" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c65796fbe83e5f154c3db148b384c9305b64e9b23b5b2eaff761e0dd127e3556" +dependencies = [ + "agent-client-protocol-schema", + "anyhow", + "boxfnonce", + "futures", + "jsonrpcmsg", + "serde", + "serde_json", + "thiserror 1.0.69", + "tracing", + "uuid", +] + [[package]] name = "safetensors" version = "0.4.5" @@ -7862,6 +7998,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "unicode_categories" version = "0.1.1" diff --git a/crates/agent/Cargo.toml b/crates/agent/Cargo.toml index 4e568a62dc..0d35c1537d 100644 --- a/crates/agent/Cargo.toml +++ b/crates/agent/Cargo.toml @@ -8,6 +8,7 @@ version.workspace = true license.workspace = true [dependencies] +agent-client-protocol = "0.7.0" amzn-codewhisperer-client.workspace = true amzn-codewhisperer-streaming-client.workspace = true amzn-consolas-client.workspace = true @@ -55,6 +56,7 @@ rmcp = { version = "0.8.0", features = ["client", "transport-async-rw", "transpo rusqlite.workspace = true rustls.workspace = true rustls-native-certs.workspace = true +sacp = "0.1" schemars = "1.0.4" semver.workspace = true serde.workspace = true diff --git a/crates/agent/src/acp/README.md b/crates/agent/src/acp/README.md new file mode 100644 index 0000000000..631cd8761b --- /dev/null +++ b/crates/agent/src/acp/README.md @@ -0,0 +1,40 @@ +# Agent-Client-Protocol (ACP) + +This is an implementation of ACP agent interface using Amazon Q CLI agent + +## Features + +### Supported +- New Session setup ("session/new") +- Basic chat ("session/prompt" & "session/update") +- Using built-in tool like fs_read and fs_write ("tool_call") +- Request tool call permission ("session/request_permission") + +### Not Supported +- Tool call update ("tool_call_update" as part of "session/update") +- Auth +- Slash commands +- MCP +- Cancel +- Session reload + +## Usage + +### Run ACP Agent (Standalone) +```bash +cargo run -p agent -- acp + +# Test it with: +{"jsonrpc":"2.0","id":0,"method":"initialize","params":{"protocolVersion":1}} +``` + +### Run ACP Client (for testing) +```bash +# Build agent first +cargo build -p agent + +# Run interactive test client +cargo run -p agent -- acp-client ./target/debug/agent +``` + +The test client automatically launch the ACP Agent. It provides a REPL interface for sending prompts to the agent and automatically approves tool permissions. \ No newline at end of file diff --git a/crates/agent/src/acp/acp_agent.rs b/crates/agent/src/acp/acp_agent.rs new file mode 100644 index 0000000000..4c79a6f5c2 --- /dev/null +++ b/crates/agent/src/acp/acp_agent.rs @@ -0,0 +1,361 @@ +//! ACP Agent interface for Q CLI agent +//! +//! Usage (from workspace root): +//! ```bash +//! cargo run -p agent -- acp +//! ``` + +use std::cell::RefCell; +use std::collections::HashMap; +use std::process::ExitCode; +use std::rc::Rc; +use std::sync::Arc; + +use agent::agent_loop::types::ToolUseBlock; +use agent::api_client::ApiClient; +use agent::mcp::McpManager; +use agent::protocol::{ + AgentEvent, + AgentStopReason, + ContentChunk, + SendPromptArgs, + UpdateEvent, +}; +use agent::rts::{ + RtsModel, + RtsModelState, +}; +use agent::types::AgentSnapshot; +use agent::{ + Agent, + AgentHandle, +}; +use eyre::Result; +use sacp::{ + AgentCapabilities, + CancelNotification, + ContentBlock, + ContentChunk as SacpContentChunk, + Implementation, + InitializeRequest, + InitializeResponse, + JrConnection, + JrRequestCx, + NewSessionRequest, + NewSessionResponse, + PermissionOption, + PermissionOptionId, + PermissionOptionKind, + PromptRequest, + PromptResponse, + RequestPermissionRequest, + SessionId, + SessionNotification, + SessionUpdate, + StopReason, + TextContent, + ToolCall, + ToolCallId, + ToolCallStatus, + ToolCallUpdate, + ToolCallUpdateFields, + ToolKind, + V1, +}; +use tokio_util::compat::{ + TokioAsyncReadCompatExt, + TokioAsyncWriteCompatExt, +}; +use tracing::info; + +/// ACP Session that processes requests using Amazon Q agent +struct AcpSession { + agent: AgentHandle, +} + +impl AcpSession { + /// Create a new ACP session handler with Amazon Q backend + async fn new() -> Result { + // Create agent snapshot + let snapshot = AgentSnapshot::default(); + + // Create RTS model + let rts_state = RtsModelState::new(); + let model = Arc::new(RtsModel::new( + ApiClient::new().await?, + rts_state.conversation_id, + rts_state.model_id, + )); + + // Spawn agent + let agent = Agent::new(snapshot, model, McpManager::new().spawn()).await?.spawn(); + + Ok(Self { agent }) + } + + /// Handle user request from ACP client: + /// - submit the request to the agent + /// - poll agent update events, convert them to ACP events, and send them back to ACP client + /// - tell ACP client that the request is completed + async fn handle_prompt_request( + &self, + request: PromptRequest, + request_cx: JrRequestCx, + ) -> Result<(), sacp::Error> { + let session_id = request.session_id.clone(); + let mut agent = self.agent.clone(); + + // Send user request to agent (non-blocking) + self.send_request_async(&request).await?; + + // We want to avoid blocking the main event loop because it needs to do other work! + // so spawn a new task and wait for end of turn + let _ = request_cx.clone().spawn(async move { + loop { + match agent.recv().await { + Ok(event) => match event { + AgentEvent::Update(update_event) => { + // Forward updates to ACP client via notifications + if let Some(session_update) = convert_update_event(update_event) { + request_cx.send_notification(SessionNotification { + session_id: session_id.clone(), + update: session_update, + meta: None, + })?; + } + }, + AgentEvent::ApprovalRequest { id, tool_use, context } => { + info!( + "AgentEvent::ApprovalRequest: id={}, tool_use={:?}, context={:?}", + id, tool_use, context + ); + handle_approval_request(id, tool_use, session_id.clone(), agent.clone(), &request_cx)?; + }, + AgentEvent::EndTurn(_metadata) => { + // Conversation complete - respond and exit task + return request_cx.respond(PromptResponse { + stop_reason: StopReason::EndTurn, + meta: None, + }); + }, + AgentEvent::Stop(AgentStopReason::Error(_)) => { + // Agent error - respond with error + return request_cx.respond_with_error(sacp::Error::internal_error()); + }, + _ => { + // Handle other agent events if needed + }, + }, + Err(_) => { + // Agent channel closed unexpectedly + return request_cx.respond_with_error(sacp::Error::internal_error()); + }, + } + } + }); + + Ok(()) + } + + /// Send user request to agent + async fn send_request_async(&self, request: &PromptRequest) -> Result<(), sacp::Error> { + // Convert ACP prompt request to agent format + let content: Vec = request + .prompt + .iter() + .filter_map(|block| match block { + ContentBlock::Text(text_content) => { + Some(agent::protocol::ContentChunk::Text(text_content.text.clone())) + }, + _ => None, // Skip non-text content for now + }) + .collect(); + + // Send prompt to agent asynchronously + self.agent + .send_prompt_async(SendPromptArgs { + content, + should_continue_turn: None, + }) + .await + .map_err(|_| sacp::Error::internal_error())?; + + Ok(()) + } +} + +/// Convert agent UpdateEvent to ACP SessionUpdate +fn convert_update_event(update_event: UpdateEvent) -> Option { + match update_event { + UpdateEvent::AgentContent(ContentChunk::Text(text)) => { + Some(SessionUpdate::AgentMessageChunk(SacpContentChunk { + content: ContentBlock::Text(TextContent { + text, + annotations: None, + meta: None, + }), + meta: None, + })) + }, + UpdateEvent::ToolCall(tool_call) => { + let sacp_tool_call = ToolCall { + id: ToolCallId(tool_call.id.into()), + title: tool_call.tool_use_block.name.clone(), + kind: ToolKind::default(), + status: ToolCallStatus::Pending, + content: vec![], + locations: vec![], + raw_input: Some(tool_call.tool_use_block.input.clone()), + raw_output: None, + meta: None, + }; + Some(SessionUpdate::ToolCall(sacp_tool_call)) + }, + _ => None, // Skip other events + } +} + +/// Handle tool use approval request +fn handle_approval_request( + id: String, + tool_use: ToolUseBlock, + session_id: SessionId, + agent: AgentHandle, + request_cx: &JrRequestCx, +) -> Result<(), sacp::Error> { + let permission_request = RequestPermissionRequest { + session_id, + tool_call: ToolCallUpdate { + id: ToolCallId(tool_use.tool_use_id.clone().into()), + fields: ToolCallUpdateFields { + status: Some(ToolCallStatus::Pending), + title: Some(tool_use.name.clone()), + raw_input: Some(tool_use.input.clone()), + ..Default::default() + }, + meta: None, + }, + options: vec![ + PermissionOption { + id: PermissionOptionId("allow".into()), + name: "Allow".to_string(), + kind: PermissionOptionKind::AllowOnce, + meta: None, + }, + PermissionOption { + id: PermissionOptionId("deny".into()), + name: "Deny".to_string(), + kind: PermissionOptionKind::RejectOnce, + meta: None, + }, + ], + meta: None, + }; + + request_cx + .send_request(permission_request) + .await_when_result_received(|result| async move { + info!("Permission request result: {:?}", result); + let approval_result = match result { + Ok(response) => match &response.outcome { + sacp::RequestPermissionOutcome::Selected { option_id } => { + if option_id.0.as_ref() == "allow" { + agent::protocol::ApprovalResult::Approve + } else { + agent::protocol::ApprovalResult::Deny { reason: None } + } + }, + sacp::RequestPermissionOutcome::Cancelled => agent::protocol::ApprovalResult::Deny { + reason: Some("Cancelled".to_string()), + }, + }, + Err(_) => agent::protocol::ApprovalResult::Deny { + reason: Some("Request failed".to_string()), + }, + }; + + let _ = agent + .send_tool_use_approval_result(agent::protocol::SendApprovalResultArgs { + id, + result: approval_result, + }) + .await; + Ok(()) + }) +} + +/// Entry point for SACP agent +pub async fn execute() -> Result { + let outgoing = tokio::io::stdout().compat_write(); + let incoming = tokio::io::stdin().compat(); + + // Create session manager + let sessions = Rc::new(RefCell::new(HashMap::new())); + + let local_set = tokio::task::LocalSet::new(); + local_set + .run_until(async move { + // Create SACP connection with handlers + let connection = JrConnection::new(outgoing, incoming) + // Handle initialize request + .on_receive_request({ + async move |_request: InitializeRequest, request_cx| { + request_cx.respond(InitializeResponse { + protocol_version: V1, + agent_capabilities: AgentCapabilities::default(), + auth_methods: Vec::new(), + agent_info: Some(Implementation { + name: "amazon-q-agent".to_string(), + title: Some("Amazon Q Agent".to_string()), + version: env!("CARGO_PKG_VERSION").to_string(), + }), + meta: None, + }) + } + }) + // Handle new_session request + .on_receive_request({ + let sessions = Rc::clone(&sessions); + async move |_request: NewSessionRequest, request_cx| { + let session_id = SessionId(uuid::Uuid::new_v4().to_string().into()); + let session = Rc::new(AcpSession::new().await.map_err(|_| sacp::Error::internal_error())?); + + sessions.borrow_mut().insert(session_id.clone(), session); + + request_cx.respond(NewSessionResponse { + session_id, + modes: None, + meta: None, + }) + } + }) + // Handle prompt request + .on_receive_request({ + let sessions = Rc::clone(&sessions); + async move |request: PromptRequest, request_cx| { + let session = sessions.borrow().get(&request.session_id).cloned(); + + match session { + Some(session) => session.handle_prompt_request(request, request_cx).await, + None => request_cx.respond_with_error(sacp::Error::invalid_request()), + } + } + }) + // Handle cancel notification + .on_receive_notification({ + async move |_notification: CancelNotification, _cx| { + // TODO: Implement cancellation if needed + Ok(()) + } + }); + + // Run the connection + connection + .serve() + .await + .map_err(|e| eyre::eyre!("Connection error: {}", e)) + }) + .await?; + + Ok(ExitCode::SUCCESS) +} diff --git a/crates/agent/src/acp/acp_client.rs b/crates/agent/src/acp/acp_client.rs new file mode 100644 index 0000000000..3bb3f3003d --- /dev/null +++ b/crates/agent/src/acp/acp_client.rs @@ -0,0 +1,178 @@ +//! Interactive ACP Test Client for testing ACP agents. +//! +//! Usage (from workspace root): +//! ```bash +//! # Run the interactive test client (from workspace root) +//! cargo run -p agent -- acp-client ./target/debug/agent +//! ``` + +use std::process::ExitCode; + +use agent_client_protocol::{ + self as acp, + Agent as _, +}; +use eyre::Result; +use tokio_util::compat::{ + TokioAsyncReadCompatExt, + TokioAsyncWriteCompatExt, +}; + +struct AcpClient; + +#[async_trait::async_trait(?Send)] +impl acp::Client for AcpClient { + async fn session_notification(&self, args: acp::SessionNotification) -> acp::Result<()> { + match args.update { + acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => match content { + acp::ContentBlock::Text(text) => println!("Agent: {}", text.text), + _ => println!("Agent: "), + }, + acp::SessionUpdate::ToolCall(tool_call) => { + println!("🔧 Tool Call: {:#?}", tool_call); + }, + _ => { + // Handle other session update types if needed + }, + } + Ok(()) + } + + async fn request_permission( + &self, + args: acp::RequestPermissionRequest, + ) -> acp::Result { + println!("Permission request from server: {:?}", args); + + // Auto-approve first option if available + let option_id = args + .options + .first() + .map(|opt| opt.id.clone()) + .ok_or_else(|| acp::Error::internal_error())?; + + Ok(acp::RequestPermissionResponse { + outcome: acp::RequestPermissionOutcome::Selected { option_id }, + meta: None, + }) + } + + async fn write_text_file(&self, _args: acp::WriteTextFileRequest) -> acp::Result { + Err(acp::Error::method_not_found()) + } + + async fn read_text_file(&self, _args: acp::ReadTextFileRequest) -> acp::Result { + Err(acp::Error::method_not_found()) + } + + async fn create_terminal(&self, _args: acp::CreateTerminalRequest) -> acp::Result { + Err(acp::Error::method_not_found()) + } + + async fn terminal_output(&self, _args: acp::TerminalOutputRequest) -> acp::Result { + Err(acp::Error::method_not_found()) + } + + async fn release_terminal(&self, _args: acp::ReleaseTerminalRequest) -> acp::Result { + Err(acp::Error::method_not_found()) + } + + async fn wait_for_terminal_exit( + &self, + _args: acp::WaitForTerminalExitRequest, + ) -> acp::Result { + Err(acp::Error::method_not_found()) + } + + async fn kill_terminal_command( + &self, + _args: acp::KillTerminalCommandRequest, + ) -> acp::Result { + Err(acp::Error::method_not_found()) + } + + async fn ext_method(&self, _args: acp::ExtRequest) -> acp::Result { + Err(acp::Error::method_not_found()) + } + + async fn ext_notification(&self, _args: acp::ExtNotification) -> acp::Result<()> { + Err(acp::Error::method_not_found()) + } +} + +pub async fn execute(agent_path: String) -> Result { + let mut child = tokio::process::Command::new(&agent_path) + .arg("acp") // Add the acp subcommand + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .kill_on_drop(true) + .spawn()?; + + let outgoing = child.stdin.take().unwrap().compat_write(); + let incoming = child.stdout.take().unwrap().compat(); + + let local_set = tokio::task::LocalSet::new(); + local_set + .run_until(async move { + let (conn, handle_io) = acp::ClientSideConnection::new(AcpClient, outgoing, incoming, |fut| { + tokio::task::spawn_local(fut); + }); + + tokio::task::spawn_local(handle_io); + + // Initialize connection + conn.initialize(acp::InitializeRequest { + protocol_version: acp::V1, + client_capabilities: acp::ClientCapabilities::default(), + client_info: Some(acp::Implementation { + name: "acp-test-client".to_string(), + title: Some("ACP Test Client".to_string()), + version: "0.1.0".to_string(), + }), + meta: None, + }) + .await?; + + // Create session + let session = conn + .new_session(acp::NewSessionRequest { + mcp_servers: Vec::new(), + cwd: std::env::current_dir()?, + meta: None, + }) + .await?; + + // Interactive prompt loop + println!("ACP Test Client - Type messages to send to agent (Ctrl+C to exit)"); + loop { + print!("> "); + std::io::Write::flush(&mut std::io::stdout())?; + + let mut input = String::new(); + if std::io::stdin().read_line(&mut input)? == 0 { + break; // EOF + } + + let input = input.trim(); + if input.is_empty() { + continue; + } + + conn.prompt(acp::PromptRequest { + session_id: session.session_id.clone(), + prompt: vec![acp::ContentBlock::Text(acp::TextContent { + text: input.to_string(), + annotations: None, + meta: None, + })], + meta: None, + }) + .await?; + } + + Ok::<(), eyre::Error>(()) + }) + .await?; + + Ok(ExitCode::SUCCESS) +} diff --git a/crates/agent/src/acp/mod.rs b/crates/agent/src/acp/mod.rs new file mode 100644 index 0000000000..f12d01cb84 --- /dev/null +++ b/crates/agent/src/acp/mod.rs @@ -0,0 +1,2 @@ +pub mod acp_agent; +pub mod acp_client; diff --git a/crates/agent/src/agent/mod.rs b/crates/agent/src/agent/mod.rs index 42b9e19e71..45c8ab9c93 100644 --- a/crates/agent/src/agent/mod.rs +++ b/crates/agent/src/agent/mod.rs @@ -192,6 +192,14 @@ impl AgentHandle { } } + pub async fn send_prompt_async(&self, args: SendPromptArgs) -> Result<(), AgentError> { + self.sender + .send_async(AgentRequest::SendPrompt(args)) + .await + .map_err(|_| AgentError::Channel)?; + Ok(()) + } + pub async fn send_tool_use_approval_result(&self, args: SendApprovalResultArgs) -> Result<(), AgentError> { match self .sender diff --git a/crates/agent/src/agent/util/request_channel.rs b/crates/agent/src/agent/util/request_channel.rs index e35a438c37..bd7958b228 100644 --- a/crates/agent/src/agent/util/request_channel.rs +++ b/crates/agent/src/agent/util/request_channel.rs @@ -88,6 +88,12 @@ where }, } } + + pub async fn send_async(&self, payload: Req) -> Result<(), mpsc::error::SendError>> { + let (res_tx, _res_rx) = oneshot::channel(); + let request = Request { payload, res_tx }; + self.tx.send(request).await + } } pub type RequestReceiver = mpsc::Receiver>; diff --git a/crates/agent/src/cli/mod.rs b/crates/agent/src/cli/mod.rs index dd40d58e9b..120b22c283 100644 --- a/crates/agent/src/cli/mod.rs +++ b/crates/agent/src/cli/mod.rs @@ -27,6 +27,11 @@ use tracing_subscriber::{ Registry, }; +use crate::acp::{ + acp_agent, + acp_client, +}; + #[derive(Debug, Clone, Parser)] pub struct CliArgs { #[command(subcommand)] @@ -61,12 +66,21 @@ impl CliArgs { pub enum RootSubcommand { /// Run a single prompt Run(RunArgs), + /// Test ACP client + AcpClient { + /// Path to the ACP agent executable + agent_path: String, + }, + /// ACP server + Acp, } impl RootSubcommand { pub async fn execute(self) -> Result { match self { RootSubcommand::Run(run_args) => run_args.execute().await, + RootSubcommand::AcpClient { agent_path } => acp_client::execute(agent_path).await, + RootSubcommand::Acp => acp_agent::execute().await, } } } diff --git a/crates/agent/src/main.rs b/crates/agent/src/main.rs index 64127a8fe2..6356a9da1e 100644 --- a/crates/agent/src/main.rs +++ b/crates/agent/src/main.rs @@ -1,3 +1,4 @@ +mod acp; mod cli; use std::process::ExitCode; From df40e99b7f1e682e03214a69bc1c9952ea8dc8e3 Mon Sep 17 00:00:00 2001 From: Erben Mo Date: Thu, 6 Nov 2025 15:39:33 -0800 Subject: [PATCH 2/5] Support diff view in fs_write --- crates/agent/src/acp/acp_agent.rs | 154 +++++++++++++++++------------- 1 file changed, 88 insertions(+), 66 deletions(-) diff --git a/crates/agent/src/acp/acp_agent.rs b/crates/agent/src/acp/acp_agent.rs index 4c79a6f5c2..19bd592098 100644 --- a/crates/agent/src/acp/acp_agent.rs +++ b/crates/agent/src/acp/acp_agent.rs @@ -14,58 +14,21 @@ use std::sync::Arc; use agent::agent_loop::types::ToolUseBlock; use agent::api_client::ApiClient; use agent::mcp::McpManager; -use agent::protocol::{ - AgentEvent, - AgentStopReason, - ContentChunk, - SendPromptArgs, - UpdateEvent, -}; -use agent::rts::{ - RtsModel, - RtsModelState, -}; +use agent::protocol::{AgentEvent, AgentStopReason, ContentChunk, SendPromptArgs, UpdateEvent}; +use agent::rts::{RtsModel, RtsModelState}; +use agent::tools::BuiltInToolName; use agent::types::AgentSnapshot; -use agent::{ - Agent, - AgentHandle, -}; +use agent::{Agent, AgentHandle}; use eyre::Result; use sacp::{ - AgentCapabilities, - CancelNotification, - ContentBlock, - ContentChunk as SacpContentChunk, - Implementation, - InitializeRequest, - InitializeResponse, - JrConnection, - JrRequestCx, - NewSessionRequest, - NewSessionResponse, - PermissionOption, - PermissionOptionId, - PermissionOptionKind, - PromptRequest, - PromptResponse, - RequestPermissionRequest, - SessionId, - SessionNotification, - SessionUpdate, - StopReason, - TextContent, - ToolCall, - ToolCallId, - ToolCallStatus, - ToolCallUpdate, - ToolCallUpdateFields, - ToolKind, - V1, -}; -use tokio_util::compat::{ - TokioAsyncReadCompatExt, - TokioAsyncWriteCompatExt, + AgentCapabilities, CancelNotification, ContentBlock, ContentChunk as SacpContentChunk, Diff, Implementation, + InitializeRequest, InitializeResponse, JrConnection, JrRequestCx, NewSessionRequest, NewSessionResponse, + PermissionOption, PermissionOptionId, PermissionOptionKind, PromptRequest, PromptResponse, + RequestPermissionRequest, SessionId, SessionNotification, SessionUpdate, StopReason, TextContent, ToolCall, + ToolCallContent, ToolCallId, ToolCallStatus, ToolCallUpdate, ToolCallUpdateFields, ToolKind, V1, }; +use std::str::FromStr; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use tracing::info; /// ACP Session that processes requests using Amazon Q agent @@ -115,14 +78,7 @@ impl AcpSession { match agent.recv().await { Ok(event) => match event { AgentEvent::Update(update_event) => { - // Forward updates to ACP client via notifications - if let Some(session_update) = convert_update_event(update_event) { - request_cx.send_notification(SessionNotification { - session_id: session_id.clone(), - update: session_update, - meta: None, - })?; - } + handle_update_event(update_event, session_id.clone(), &request_cx)?; }, AgentEvent::ApprovalRequest { id, tool_use, context } => { info!( @@ -184,9 +140,13 @@ impl AcpSession { } } -/// Convert agent UpdateEvent to ACP SessionUpdate -fn convert_update_event(update_event: UpdateEvent) -> Option { - match update_event { +/// Handle agent update event and forward to ACP client +fn handle_update_event( + update_event: UpdateEvent, + session_id: SessionId, + request_cx: &JrRequestCx, +) -> Result<(), sacp::Error> { + let session_update = match update_event { UpdateEvent::AgentContent(ContentChunk::Text(text)) => { Some(SessionUpdate::AgentMessageChunk(SacpContentChunk { content: ContentBlock::Text(TextContent { @@ -198,20 +158,82 @@ fn convert_update_event(update_event: UpdateEvent) -> Option { })) }, UpdateEvent::ToolCall(tool_call) => { - let sacp_tool_call = ToolCall { + let acp_tool_call = ToolCall { id: ToolCallId(tool_call.id.into()), title: tool_call.tool_use_block.name.clone(), - kind: ToolKind::default(), - status: ToolCallStatus::Pending, - content: vec![], - locations: vec![], + kind: get_tool_kind(&tool_call.tool_use_block.name), + status: ToolCallStatus::InProgress, + content: get_tool_content(&tool_call.tool_use_block.name, &tool_call.tool_use_block.input), + locations: vec![], // TODO: We need line number for fs_write raw_input: Some(tool_call.tool_use_block.input.clone()), raw_output: None, meta: None, }; - Some(SessionUpdate::ToolCall(sacp_tool_call)) + Some(SessionUpdate::ToolCall(acp_tool_call)) }, - _ => None, // Skip other events + _ => None, + }; + + if let Some(update) = session_update { + request_cx.send_notification(SessionNotification { + session_id, + update, + meta: None, + })?; + } + Ok(()) +} + +/// Get ToolKind for a tool name +fn get_tool_kind(tool_name: &str) -> ToolKind { + if let Ok(builtin_tool) = BuiltInToolName::from_str(tool_name) { + match builtin_tool { + BuiltInToolName::FsRead => ToolKind::Read, + BuiltInToolName::FsWrite => ToolKind::Edit, + BuiltInToolName::ExecuteCmd => ToolKind::Execute, + BuiltInToolName::ImageRead => ToolKind::Read, + BuiltInToolName::Ls => ToolKind::Read, + } + } else { + ToolKind::Other + } +} + +/// Get content for tool calls based on tool type +fn get_tool_content(tool_name: &str, input: &serde_json::Value) -> Vec { + if let Ok(builtin_tool) = BuiltInToolName::from_str(tool_name) { + match builtin_tool { + BuiltInToolName::FsWrite => { + // for fs_write we need to populate "Diff" content + let path = input["path"].as_str().unwrap(); + let command = input["command"].as_str().unwrap(); + + let (old_text, new_text) = match command { + "create" => { + let content = input["content"].as_str().unwrap().to_string(); + (None, content) + }, + "strReplace" => { + let old_str = input["oldStr"].as_str().unwrap().to_string(); + let new_str = input["newStr"].as_str().unwrap().to_string(); + (Some(old_str), new_str) + }, + _ => return vec![], + }; + + vec![ToolCallContent::Diff { + diff: Diff { + path: path.into(), + old_text, + new_text, + meta: None, + }, + }] + }, + _ => vec![], + } + } else { + vec![] } } From 3e2ba878d9b2f1e2c0c5869cd633ca5b012ab822 Mon Sep 17 00:00:00 2001 From: Erben Mo Date: Thu, 6 Nov 2025 21:31:56 -0800 Subject: [PATCH 3/5] Add Agent event and ACP event for ToolCallFinished --- crates/agent/src/acp/acp_agent.rs | 81 ++++++++++++++++++++++++++---- crates/agent/src/acp/acp_client.rs | 3 ++ crates/agent/src/agent/mod.rs | 24 ++++++++- 3 files changed, 97 insertions(+), 11 deletions(-) diff --git a/crates/agent/src/acp/acp_agent.rs b/crates/agent/src/acp/acp_agent.rs index 19bd592098..4c5a650308 100644 --- a/crates/agent/src/acp/acp_agent.rs +++ b/crates/agent/src/acp/acp_agent.rs @@ -9,26 +9,68 @@ use std::cell::RefCell; use std::collections::HashMap; use std::process::ExitCode; use std::rc::Rc; +use std::str::FromStr; use std::sync::Arc; use agent::agent_loop::types::ToolUseBlock; use agent::api_client::ApiClient; use agent::mcp::McpManager; -use agent::protocol::{AgentEvent, AgentStopReason, ContentChunk, SendPromptArgs, UpdateEvent}; -use agent::rts::{RtsModel, RtsModelState}; +use agent::protocol::{ + AgentEvent, + AgentStopReason, + ContentChunk, + SendPromptArgs, + ToolCallResult, + UpdateEvent, +}; +use agent::rts::{ + RtsModel, + RtsModelState, +}; use agent::tools::BuiltInToolName; use agent::types::AgentSnapshot; -use agent::{Agent, AgentHandle}; +use agent::{ + Agent, + AgentHandle, +}; use eyre::Result; use sacp::{ - AgentCapabilities, CancelNotification, ContentBlock, ContentChunk as SacpContentChunk, Diff, Implementation, - InitializeRequest, InitializeResponse, JrConnection, JrRequestCx, NewSessionRequest, NewSessionResponse, - PermissionOption, PermissionOptionId, PermissionOptionKind, PromptRequest, PromptResponse, - RequestPermissionRequest, SessionId, SessionNotification, SessionUpdate, StopReason, TextContent, ToolCall, - ToolCallContent, ToolCallId, ToolCallStatus, ToolCallUpdate, ToolCallUpdateFields, ToolKind, V1, + AgentCapabilities, + CancelNotification, + ContentBlock, + ContentChunk as SacpContentChunk, + Diff, + Implementation, + InitializeRequest, + InitializeResponse, + JrConnection, + JrRequestCx, + NewSessionRequest, + NewSessionResponse, + PermissionOption, + PermissionOptionId, + PermissionOptionKind, + PromptRequest, + PromptResponse, + RequestPermissionRequest, + SessionId, + SessionNotification, + SessionUpdate, + StopReason, + TextContent, + ToolCall, + ToolCallContent, + ToolCallId, + ToolCallStatus, + ToolCallUpdate, + ToolCallUpdateFields, + ToolKind, + V1, +}; +use tokio_util::compat::{ + TokioAsyncReadCompatExt, + TokioAsyncWriteCompatExt, }; -use std::str::FromStr; -use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use tracing::info; /// ACP Session that processes requests using Amazon Q agent @@ -171,6 +213,25 @@ fn handle_update_event( }; Some(SessionUpdate::ToolCall(acp_tool_call)) }, + UpdateEvent::ToolCallFinished { tool_call, result } => { + let (status, raw_output) = match result { + ToolCallResult::Success(output) => (ToolCallStatus::Completed, serde_json::to_value(output).ok()), + ToolCallResult::Error(_) => (ToolCallStatus::Failed, None), + ToolCallResult::Cancelled => (ToolCallStatus::Failed, None), + }; + Some(SessionUpdate::ToolCallUpdate(ToolCallUpdate { + id: ToolCallId(tool_call.id.into()), + fields: ToolCallUpdateFields { + status: Some(status), + title: Some(tool_call.tool_use_block.name.clone()), + kind: Some(get_tool_kind(&tool_call.tool_use_block.name)), + raw_input: Some(tool_call.tool_use_block.input.clone()), + raw_output, + ..Default::default() + }, + meta: None, + })) + }, _ => None, }; diff --git a/crates/agent/src/acp/acp_client.rs b/crates/agent/src/acp/acp_client.rs index 3bb3f3003d..1468c45e25 100644 --- a/crates/agent/src/acp/acp_client.rs +++ b/crates/agent/src/acp/acp_client.rs @@ -31,6 +31,9 @@ impl acp::Client for AcpClient { acp::SessionUpdate::ToolCall(tool_call) => { println!("🔧 Tool Call: {:#?}", tool_call); }, + acp::SessionUpdate::ToolCallUpdate(tool_call_update) => { + println!("✅ Tool Call Update: {:#?}", tool_call_update); + }, _ => { // Handle other session update types if needed }, diff --git a/crates/agent/src/agent/mod.rs b/crates/agent/src/agent/mod.rs index 45c8ab9c93..9a1042a904 100644 --- a/crates/agent/src/agent/mod.rs +++ b/crates/agent/src/agent/mod.rs @@ -73,6 +73,7 @@ use protocol::{ SendApprovalResultArgs, SendPromptArgs, ToolCall, + ToolCallResult, UpdateEvent, }; use serde::{ @@ -1172,7 +1173,28 @@ impl Agent { debug_assert!(executing_tools.get_tool(&evt.id).is_some()); if let Some(tool) = executing_tools.get_tool_mut(&evt.id) { - tool.result = Some(evt.result); + tool.result = Some(evt.result.clone()); + + // Emit ToolCallFinished event for the completed tool + let tool_call = ToolCall { + id: tool.tool_use_block.tool_use_id.clone(), + tool: tool.tool.clone(), + tool_use_block: tool.tool_use_block.clone(), + }; + + let result = match &evt.result { + ToolExecutorResult::Completed { result: Ok(output), .. } => { + ToolCallResult::Success(output.clone()) + } + ToolExecutorResult::Completed { result: Err(error), .. } => { + ToolCallResult::Error(error.clone()) + } + ToolExecutorResult::Cancelled { .. } => { + ToolCallResult::Cancelled + } + }; + + self.agent_event_buf.push(AgentEvent::Update(UpdateEvent::ToolCallFinished { tool_call, result })); } if !executing_tools.all_tools_finished() { From 41c0e01befcb9f5b85b1846b58d50c937b87479c Mon Sep 17 00:00:00 2001 From: Erben Mo Date: Fri, 7 Nov 2025 14:30:26 -0800 Subject: [PATCH 4/5] return early if logging is OFF so we don't create chat.log file unnecessarily --- crates/agent/src/cli/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/agent/src/cli/mod.rs b/crates/agent/src/cli/mod.rs index 120b22c283..40a9736e3f 100644 --- a/crates/agent/src/cli/mod.rs +++ b/crates/agent/src/cli/mod.rs @@ -52,6 +52,14 @@ impl CliArgs { fn setup_logging() -> Result { let env_filter = EnvFilter::try_from_default_env().unwrap_or_default(); + + // No logging configured, return dummy guard + let max_level = env_filter.max_level_hint(); + if max_level.is_none() || max_level == Some(tracing::level_filters::LevelFilter::OFF) { + let (_, guard) = NonBlocking::new(std::io::sink()); + return Ok(guard); + } + let (non_blocking, _file_guard) = NonBlocking::new(RollingFileAppender::new(Rotation::NEVER, ".", "chat.log")); let file_layer = tracing_subscriber::fmt::layer().with_writer(non_blocking); // .with_ansi(false); From 7228995f8d0a7b27a092f48b2191ce739e15ae41 Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Sat, 15 Nov 2025 13:29:57 -0500 Subject: [PATCH 5/5] Upgrade from scap 0.1 to sacp 1.0 - Update import paths: types moved to sacp::schema module - Replace JrConnection::new() with JrHandlerChain builder API - Change Rc> to Arc> for Send compatibility - Use connection_cx() for cloning and sending messages from JrRequestCx --- Cargo.lock | 6 +- crates/agent/Cargo.toml | 3 +- crates/agent/src/acp/acp_agent.rs | 99 +++++++++---------------------- 3 files changed, 32 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63ae39ee9f..9532593a6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6455,9 +6455,9 @@ checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" [[package]] name = "sacp" -version = "0.1.1" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c65796fbe83e5f154c3db148b384c9305b64e9b23b5b2eaff761e0dd127e3556" +checksum = "bf399dcd5f5c38c3c0c0c70ecc5dbda22a92a029e733061eb707d518bf987b88" dependencies = [ "agent-client-protocol-schema", "anyhow", @@ -6466,7 +6466,7 @@ dependencies = [ "jsonrpcmsg", "serde", "serde_json", - "thiserror 1.0.69", + "thiserror 2.0.17", "tracing", "uuid", ] diff --git a/crates/agent/Cargo.toml b/crates/agent/Cargo.toml index 0d35c1537d..5ab2fe61c1 100644 --- a/crates/agent/Cargo.toml +++ b/crates/agent/Cargo.toml @@ -56,7 +56,7 @@ rmcp = { version = "0.8.0", features = ["client", "transport-async-rw", "transpo rusqlite.workspace = true rustls.workspace = true rustls-native-certs.workspace = true -sacp = "0.1" +sacp = "1.0" schemars = "1.0.4" semver.workspace = true serde.workspace = true @@ -97,4 +97,3 @@ tracing-test.workspace = true [lints] workspace = true - diff --git a/crates/agent/src/acp/acp_agent.rs b/crates/agent/src/acp/acp_agent.rs index 4c5a650308..beedbe305b 100644 --- a/crates/agent/src/acp/acp_agent.rs +++ b/crates/agent/src/acp/acp_agent.rs @@ -5,72 +5,29 @@ //! cargo run -p agent -- acp //! ``` -use std::cell::RefCell; use std::collections::HashMap; use std::process::ExitCode; -use std::rc::Rc; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use agent::agent_loop::types::ToolUseBlock; use agent::api_client::ApiClient; use agent::mcp::McpManager; -use agent::protocol::{ - AgentEvent, - AgentStopReason, - ContentChunk, - SendPromptArgs, - ToolCallResult, - UpdateEvent, -}; -use agent::rts::{ - RtsModel, - RtsModelState, -}; +use agent::protocol::{AgentEvent, AgentStopReason, ContentChunk, SendPromptArgs, ToolCallResult, UpdateEvent}; +use agent::rts::{RtsModel, RtsModelState}; use agent::tools::BuiltInToolName; use agent::types::AgentSnapshot; -use agent::{ - Agent, - AgentHandle, -}; +use agent::{Agent, AgentHandle}; use eyre::Result; -use sacp::{ - AgentCapabilities, - CancelNotification, - ContentBlock, - ContentChunk as SacpContentChunk, - Diff, - Implementation, - InitializeRequest, - InitializeResponse, - JrConnection, - JrRequestCx, - NewSessionRequest, - NewSessionResponse, - PermissionOption, - PermissionOptionId, - PermissionOptionKind, - PromptRequest, - PromptResponse, - RequestPermissionRequest, - SessionId, - SessionNotification, - SessionUpdate, - StopReason, - TextContent, - ToolCall, - ToolCallContent, - ToolCallId, - ToolCallStatus, - ToolCallUpdate, - ToolCallUpdateFields, - ToolKind, - V1, -}; -use tokio_util::compat::{ - TokioAsyncReadCompatExt, - TokioAsyncWriteCompatExt, +use sacp::schema::{ + AgentCapabilities, CancelNotification, ContentBlock, ContentChunk as SacpContentChunk, Diff, Implementation, + InitializeRequest, InitializeResponse, NewSessionRequest, NewSessionResponse, PermissionOption, PermissionOptionId, + PermissionOptionKind, PromptRequest, PromptResponse, RequestPermissionOutcome, RequestPermissionRequest, SessionId, + SessionNotification, SessionUpdate, StopReason, TextContent, ToolCall, ToolCallContent, ToolCallId, ToolCallStatus, + ToolCallUpdate, ToolCallUpdateFields, ToolKind, V1, }; +use sacp::{JrHandlerChain, JrRequestCx}; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use tracing::info; /// ACP Session that processes requests using Amazon Q agent @@ -115,7 +72,7 @@ impl AcpSession { // We want to avoid blocking the main event loop because it needs to do other work! // so spawn a new task and wait for end of turn - let _ = request_cx.clone().spawn(async move { + let _ = request_cx.connection_cx().spawn(async move { loop { match agent.recv().await { Ok(event) => match event { @@ -236,7 +193,7 @@ fn handle_update_event( }; if let Some(update) = session_update { - request_cx.send_notification(SessionNotification { + request_cx.connection_cx().send_notification(SessionNotification { session_id, update, meta: None, @@ -336,19 +293,20 @@ fn handle_approval_request( }; request_cx + .connection_cx() .send_request(permission_request) .await_when_result_received(|result| async move { info!("Permission request result: {:?}", result); let approval_result = match result { Ok(response) => match &response.outcome { - sacp::RequestPermissionOutcome::Selected { option_id } => { + RequestPermissionOutcome::Selected { option_id } => { if option_id.0.as_ref() == "allow" { agent::protocol::ApprovalResult::Approve } else { agent::protocol::ApprovalResult::Deny { reason: None } } }, - sacp::RequestPermissionOutcome::Cancelled => agent::protocol::ApprovalResult::Deny { + RequestPermissionOutcome::Cancelled => agent::protocol::ApprovalResult::Deny { reason: Some("Cancelled".to_string()), }, }, @@ -373,13 +331,14 @@ pub async fn execute() -> Result { let incoming = tokio::io::stdin().compat(); // Create session manager - let sessions = Rc::new(RefCell::new(HashMap::new())); + let sessions = Arc::new(Mutex::new(HashMap::new())); let local_set = tokio::task::LocalSet::new(); local_set .run_until(async move { // Create SACP connection with handlers - let connection = JrConnection::new(outgoing, incoming) + JrHandlerChain::new() + .name("amazon-q-agent") // Handle initialize request .on_receive_request({ async move |_request: InitializeRequest, request_cx| { @@ -398,12 +357,12 @@ pub async fn execute() -> Result { }) // Handle new_session request .on_receive_request({ - let sessions = Rc::clone(&sessions); + let sessions = Arc::clone(&sessions); async move |_request: NewSessionRequest, request_cx| { let session_id = SessionId(uuid::Uuid::new_v4().to_string().into()); - let session = Rc::new(AcpSession::new().await.map_err(|_| sacp::Error::internal_error())?); + let session = Arc::new(AcpSession::new().await.map_err(|_| sacp::Error::internal_error())?); - sessions.borrow_mut().insert(session_id.clone(), session); + sessions.lock().expect("not poisoned").insert(session_id.clone(), session); request_cx.respond(NewSessionResponse { session_id, @@ -414,9 +373,9 @@ pub async fn execute() -> Result { }) // Handle prompt request .on_receive_request({ - let sessions = Rc::clone(&sessions); + let sessions = Arc::clone(&sessions); async move |request: PromptRequest, request_cx| { - let session = sessions.borrow().get(&request.session_id).cloned(); + let session = sessions.lock().expect("not poisoned").get(&request.session_id).cloned(); match session { Some(session) => session.handle_prompt_request(request, request_cx).await, @@ -430,11 +389,9 @@ pub async fn execute() -> Result { // TODO: Implement cancellation if needed Ok(()) } - }); - - // Run the connection - connection - .serve() + }) + // Run the connection + .serve(sacp::ByteStreams::new(outgoing, incoming)) .await .map_err(|e| eyre::eyre!("Connection error: {}", e)) })