diff --git a/.gitignore b/.gitignore index 69386d9..3a0e8fe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Generated by Cargo # will have compiled files and executables /target/ +/node_modules/ # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html @@ -13,4 +14,4 @@ .idea *.iml -Cargo.lock \ No newline at end of file +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml index 0005a5d..7a9a942 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,17 @@ name = "schema" path = "examples/schema.rs" required-features = ["schemars"] +[[example]] +name = "cf_workers" +path = "examples/cf_workers.rs" +required-features = ["cf_workers"] + +[lib] +crate-type = ["cdylib", "rlib"] + [features] default = ["http_server", "rand", "uuid", "tracing-span-filter"] +cf_workers = ["worker", "web-sys", "http-body-util", "getrandom"] hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"] http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"] tracing-span-filter = ["dep:tracing-subscriber"] @@ -28,8 +37,6 @@ bytes = "1.10" futures = "0.3" http = "1.3" http-body-util = { version = "0.1", optional = true } -hyper = { version = "1.6", optional = true} -hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true } pin-project-lite = "0.2" rand = { version = "0.9", optional = true } regress = "0.10" @@ -43,14 +50,26 @@ tokio = { version = "1.44", default-features = false, features = ["sync"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["registry"], optional = true } uuid = { version = "1.16.0", optional = true } +web-sys = {version = "0.3.70", optional = true} +worker = {version = "0.6.0", features=["http"], optional = true } +hyper = { version = "1.6", optional = true} +hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true } +getrandom = { version = "0.3.3", features = ["wasm_js"], optional = true} +futures-util = "0.3.31" +wasm-streams = "0.4.2" +async-stream = "0.3.6" +wasm-bindgen-futures = "0.4" [dev-dependencies] -tokio = { version = "1", features = ["full"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "registry"] } trybuild = "1.0" -reqwest = { version = "0.12", features = ["json"] } rand = "0.9" schemars = "1.0.0-alpha.17" +wasm-bindgen-test = "0.3" + +[target.'cfg(not(target_family = "wasm"))'.dev-dependencies] +reqwest = { version = "0.12", features = ["json"] } +tokio = { version = "1", features = ["full"] } [build-dependencies] jsonptr = "0.5.1" diff --git a/examples/cf_workers.rs b/examples/cf_workers.rs new file mode 100644 index 0000000..08ffcb2 --- /dev/null +++ b/examples/cf_workers.rs @@ -0,0 +1,40 @@ +#![cfg(target_family = "wasm")] + +// this is an example of a Rust file that uses the restate-sdk crate to create a Cloudflare Worker service. +// this code needs to be compiled to WebAssembly using the wrangler dev tools with extra rust flags to enable wasm support +// +// for local development use this command: +// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' npx wrangler dev +// +// or push to production using this command: +// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' npx wrangler deploy +// +// The Cloudflare Worker automated build pipeline doesn't currently support this code due to missing clang binaries +// + +use restate_sdk::prelude::*; + +#[restate_sdk::service] +trait MyService { + async fn my_handler() -> HandlerResult<()>; +} + +struct MyServiceImpl; + +impl MyService for MyServiceImpl { + async fn my_handler(&self, _: Context<'_>) -> HandlerResult<()> { + Ok(()) + } +} + +#[worker::event(fetch)] +pub async fn main( req:worker::HttpRequest, _env: worker::Env, _ctx: worker::Context) -> worker::Result> { + let endpoint = Endpoint::builder() + .with_protocol_mode(restate_sdk::discovery::ProtocolMode::RequestResponse) // Cloudflare Workers don't support bidi streams + .bind(MyServiceImpl.serve()) + .build(); + + let cf_worker = CfWorkerServer::new(endpoint); + + return cf_worker.call(req).await; +} diff --git a/src/cf_workers.rs b/src/cf_workers.rs new file mode 100644 index 0000000..b6e2027 --- /dev/null +++ b/src/cf_workers.rs @@ -0,0 +1,149 @@ + +use std::str::FromStr; +use http::{HeaderName, HeaderValue, StatusCode}; +use tokio::sync::mpsc; +use web_sys::{js_sys::Uint8Array, Reflect, Object, wasm_bindgen::{prelude::Closure, JsCast, JsValue}, + ReadableStream, ReadableStreamDefaultController, ReadableStreamDefaultReader}; +use wasm_bindgen_futures; +use worker::*; +use crate::{endpoint::{InputReceiver, OutputSender}, prelude::Endpoint}; + +// Convert Bytes to ReadableStream using Web API bindings +fn bytes_to_readable_stream(data: bytes::Bytes) -> core::result::Result { + let underlying_source = Object::new(); + + let start_closure = Closure::wrap(Box::new(move |controller: ReadableStreamDefaultController| { + // Convert bytes to Uint8Array + let uint8_array = Uint8Array::new_with_length(data.len() as u32); + uint8_array.copy_from(&data[..]); + + // Enqueue the data + let _ = controller.enqueue_with_chunk(&uint8_array); + let _ = controller.close(); + + }) as Box); + + Reflect::set( + &underlying_source, + &JsValue::from_str("start"), + start_closure.as_ref().unchecked_ref(), + )?; + + start_closure.forget(); // Prevent cleanup + + ReadableStream::new_with_underlying_source(&underlying_source) +} + +/// Cloudflare Worker server to expose Restate services. +pub struct CfWorkerServer { + endpoint: Endpoint, +} + +impl From for CfWorkerServer { + fn from(endpoint: Endpoint) -> Self { + Self { endpoint } + } +} + +impl CfWorkerServer { + + pub fn new(endpoint: Endpoint) -> Self { + Self { endpoint } + } + + pub async fn call(&self, req: HttpRequest) -> worker::Result> { + let headers = req.headers().to_owned(); + let (parts, request_body) = req.into_parts(); + let result = self.endpoint.resolve(parts.uri.path(), headers); + + if let Ok(response) = result { + match response { + crate::endpoint::Response::ReplyNow { status_code, headers, body } => { + + // convert outbound body data from a buffer into a readable stream + let readable_stream = bytes_to_readable_stream(body)?; + let mut http_response = http::Response::builder() + .status(status_code) + .body(Body::new(readable_stream))?; + + for header in headers { + let key = HeaderName::from_str(header.key.as_ref())?; + let value = HeaderValue::from_str(header.value.as_ref())?; + http_response.headers_mut().insert(key, value); + } + + Ok(http_response) + } + crate::endpoint::Response::BidiStream { status_code, headers, handler} => { + + // Cloudflare Workers don't support HTTP 1.1/HTTP 2 bididirectional streams + // Implenting this as a workaround using existing handler api, expecting a hyper request stream + // Reads entire request/reponse body to proxy data across WASM boundary + + let js_stream: ReadableStream = request_body.into_inner().unwrap().unchecked_into(); + let reader: ReadableStreamDefaultReader = js_stream.get_reader().unchecked_into(); + + + // Drain the inbound Request API body data from a Stream API ReadableStream into a buffer + let mut request_body = Vec::new(); + loop { + let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await; + match read_result { + Ok(js_value) => { + let done = js_sys::Reflect::get(&js_value, &JsValue::from_str("done")).unwrap(); + if done.as_bool().unwrap_or(false) { + break; + } + let value = js_sys::Reflect::get(&js_value, &JsValue::from_str("value")).unwrap(); + let uint8_array: Uint8Array = value.unchecked_into(); + let mut bytes = vec![0u8; uint8_array.length() as usize]; + uint8_array.copy_to(&mut bytes); + request_body.extend_from_slice(&bytes); + } + Err(_) => break, + } + } + + // Create a Rust stream from the collected bytes -- this maps to existing restate sdk stream implementation for hanlders + let request_bytes = bytes::Bytes::from(request_body); + let stream = futures_util::stream::once(async move { + Ok::>(request_bytes) + }); + + let input_receiver = InputReceiver::from_stream(stream); + + let (output_tx, output_rx) = mpsc::unbounded_channel(); + let output_sender = OutputSender::from_channel(output_tx); + + // Execute handler and collect output + let _ = handler.handle(input_receiver, output_sender).await; + + // Collect all output chunks to proxy body response across WASM boundary + let mut response_body = Vec::new(); + let mut rx = output_rx; + while let Some(chunk) = rx.recv().await { + response_body.extend_from_slice(&chunk); + } + + // Build HTTP Response-- converting outbound buffer into readable stream + let readable_stream = bytes_to_readable_stream(bytes::Bytes::from(response_body))?; + let mut http_response = http::Response::builder() + .status(status_code) + .body(Body::new(readable_stream))?; + + for header in headers { + let key = HeaderName::from_str(header.key.as_ref())?; + let value = HeaderValue::from_str(header.value.as_ref())?; + http_response.headers_mut().insert(key, value); + } + + Ok(http_response) + }, + } + } + else { + let http_response = http::Response::builder().status(StatusCode::BAD_REQUEST).body(worker::Body::empty())?; + Ok(http_response) + } + } +} diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs index 15001bf..1540056 100644 --- a/src/endpoint/mod.rs +++ b/src/endpoint/mod.rs @@ -386,6 +386,11 @@ impl Builder { Self::default() } + pub fn with_protocol_mode(mut self, mode: crate::discovery::ProtocolMode) -> Self { + self.discovery.protocol_mode = Some(mode); + self + } + /// Add a [`Service`] to this endpoint. /// /// When using the [`service`](macro@crate::service), [`object`](macro@crate::object) or [`workflow`](macro@crate::workflow) macros, diff --git a/src/lib.rs b/src/lib.rs index 195017a..8cf5e2c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -226,6 +226,8 @@ pub mod filter; pub mod http_server; #[cfg(feature = "hyper")] pub mod hyper; +#[cfg(feature = "cf_workers")] +pub mod cf_workers; pub mod serde; /// Entry-point macro to define a Restate [Service](https://docs.restate.dev/concepts/services#services-1). @@ -509,6 +511,9 @@ pub mod prelude { #[cfg(feature = "http_server")] pub use crate::http_server::HttpServer; + #[cfg(feature = "cf_workers")] + pub use crate::cf_workers::CfWorkerServer; + pub use crate::context::{ CallFuture, Context, ContextAwakeables, ContextClient, ContextPromises, ContextReadState, ContextSideEffects, ContextTimers, ContextWriteState, HeaderMap, InvocationHandle, diff --git a/tests/cf_workers.rs b/tests/cf_workers.rs new file mode 100644 index 0000000..0c58e7a --- /dev/null +++ b/tests/cf_workers.rs @@ -0,0 +1,31 @@ +#![cfg(target_family = "wasm")] + +// test requires wasm-pack / wasm-bindgen-test framework +// use following command to execute test: +// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' wasm-pack test --node -- --no-default-features --features cf_workers --test cf_workers +use restate_sdk::{cf_workers::CfWorkerServer, prelude::*}; +use wasm_bindgen_test::*; + +#[restate_sdk::service] +trait MyService { + async fn my_handler() -> HandlerResult<()>; +} + +struct MyServiceImpl; + +impl MyService for MyServiceImpl { + async fn my_handler(&self, _: Context<'_>) -> HandlerResult<()> { + Ok(()) + } +} + +#[wasm_bindgen_test] +fn cf_workerservice_handler() { + let endpoint = Endpoint::builder() + .bind(MyServiceImpl.serve()) + .build(); + + let cf_server = CfWorkerServer::new(endpoint); + let health_check_request = http::Request::builder().uri("/health").body(worker::Body::empty()).unwrap(); + let result = cf_server.call(health_check_request); +}