From 423748b7a908326539e30c7cce64c6bc52fc5c11 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Sun, 29 Jun 2025 15:10:37 -0400 Subject: [PATCH 01/11] initial commit for cloudflare workers feature --- .gitignore | 3 +- Cargo.toml | 18 +++++++--- src/cf_workers.rs | 85 +++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 5 +++ tests/cf_workers.rs | 33 ++++++++++++++++++ 5 files changed, 139 insertions(+), 5 deletions(-) create mode 100644 src/cf_workers.rs create mode 100644 tests/cf_workers.rs diff --git a/.gitignore b/.gitignore index 69386d9..3a0e8fe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Generated by Cargo # will have compiled files and executables /target/ +/node_modules/ # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html @@ -13,4 +14,4 @@ .idea *.iml -Cargo.lock \ No newline at end of file +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml index 0005a5d..3e18f05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,12 @@ name = "schema" path = "examples/schema.rs" required-features = ["schemars"] +[lib] +crate-type = ["cdylib", "rlib"] + [features] default = ["http_server", "rand", "uuid", "tracing-span-filter"] +cf_workers = ["worker", "web-sys", "http-body-util", "getrandom"] hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"] http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"] tracing-span-filter = ["dep:tracing-subscriber"] @@ -28,8 +32,6 @@ bytes = "1.10" futures = "0.3" http = "1.3" http-body-util = { version = "0.1", optional = true } -hyper = { version = "1.6", optional = true} -hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true } pin-project-lite = "0.2" rand = { version = "0.9", optional = true } regress = "0.10" @@ -43,14 +45,22 @@ tokio = { version = "1.44", default-features = false, features = ["sync"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["registry"], optional = true } uuid = { version = "1.16.0", optional = true } +web-sys = {version = "0.3.70", optional = true} +worker = {version = "0.6.0", features=["http"], optional = true } +hyper = { version = "1.6", optional = true} +hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true } +getrandom = { version = "0.3.3", features = ["wasm_js"], optional = true} [dev-dependencies] -tokio = { version = "1", features = ["full"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "registry"] } trybuild = "1.0" -reqwest = { version = "0.12", features = ["json"] } rand = "0.9" schemars = "1.0.0-alpha.17" +wasm-bindgen-test = "0.3" + +[target.'cfg(not(target_family = "wasm"))'.dev-dependencies] +reqwest = { version = "0.12", features = ["json"] } +tokio = { version = "1", features = ["full"] } [build-dependencies] jsonptr = "0.5.1" diff --git a/src/cf_workers.rs b/src/cf_workers.rs new file mode 100644 index 0000000..61aa936 --- /dev/null +++ b/src/cf_workers.rs @@ -0,0 +1,85 @@ + +use std::str::FromStr; +use http::{HeaderName, HeaderValue, StatusCode}; +use web_sys::{js_sys::Uint8Array, wasm_bindgen::{prelude::Closure, JsCast, JsValue}, ReadableStream, ReadableStreamDefaultController}; +use worker::*; +use crate::prelude::Endpoint; + +// Convert Bytes to ReadableStream using Web API bindings +fn bytes_to_readable_stream(data: bytes::Bytes) -> core::result::Result { + let underlying_source = js_sys::Object::new(); + + let start_closure = Closure::wrap(Box::new(move |controller: ReadableStreamDefaultController| { + // Convert bytes to Uint8Array + let uint8_array = Uint8Array::new_with_length(data.len() as u32); + uint8_array.copy_from(&data[..]); + + // Enqueue the data + let _ = controller.enqueue_with_chunk(&uint8_array); + let _ = controller.close(); + + }) as Box); + + js_sys::Reflect::set( + &underlying_source, + &JsValue::from_str("start"), + start_closure.as_ref().unchecked_ref(), + )?; + + start_closure.forget(); // Prevent cleanup + + ReadableStream::new_with_underlying_source(&underlying_source) +} + +/// Http server to expose your Restate services. +pub struct CfWorkerServer { + endpoint: Endpoint, +} + +impl From for CfWorkerServer { + fn from(endpoint: Endpoint) -> Self { + Self { endpoint } + } +} + +impl CfWorkerServer { + /// Create new [`HttpServer`] from an [`Endpoint`]. + pub fn new(endpoint: Endpoint) -> Self { + Self { endpoint } + } + + pub fn call(&self, req: HttpRequest) -> worker::Result> { + let headers = req.headers().to_owned(); + let result = self.endpoint.resolve(req.uri().path(), headers); + + if let Ok(response) = result { + match response { + crate::endpoint::Response::ReplyNow { status_code, headers, body } => { + + let readable_stream = bytes_to_readable_stream(body)?; + let mut http_response = http::Response::builder() + .status(status_code) + .body(Body::new(readable_stream))?; + + for header in headers { + let key = HeaderName::from_str(header.key.as_ref())?; + let value = HeaderValue::from_str(header.value.as_ref())?; + http_response.headers_mut().insert(key, value); + } + + Ok(http_response) + } + crate::endpoint::Response::BidiStream { status_code:_, headers:_, handler:_ } => { + // Cloudflare Workers don't support HTTP 1.1/HTTP 2 bididirectional streams + let http_response = http::Response::builder().status(StatusCode::NOT_IMPLEMENTED).body(worker::Body::empty())?; + // have to use worker::Result not http::Result + Ok(http_response) + }, + } + } + else { + let http_response = http::Response::builder().status(StatusCode::BAD_REQUEST).body(worker::Body::empty())?; + Ok(http_response) + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 195017a..8cf5e2c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -226,6 +226,8 @@ pub mod filter; pub mod http_server; #[cfg(feature = "hyper")] pub mod hyper; +#[cfg(feature = "cf_workers")] +pub mod cf_workers; pub mod serde; /// Entry-point macro to define a Restate [Service](https://docs.restate.dev/concepts/services#services-1). @@ -509,6 +511,9 @@ pub mod prelude { #[cfg(feature = "http_server")] pub use crate::http_server::HttpServer; + #[cfg(feature = "cf_workers")] + pub use crate::cf_workers::CfWorkerServer; + pub use crate::context::{ CallFuture, Context, ContextAwakeables, ContextClient, ContextPromises, ContextReadState, ContextSideEffects, ContextTimers, ContextWriteState, HeaderMap, InvocationHandle, diff --git a/tests/cf_workers.rs b/tests/cf_workers.rs new file mode 100644 index 0000000..3b1f01d --- /dev/null +++ b/tests/cf_workers.rs @@ -0,0 +1,33 @@ +#![cfg(target_family = "wasm")] + +// test requires wasm-pack / wasm-bindgen-test framework +// use following command to execute test: +// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' wasm-pack test --node -- --no-default-features --features cf_workers --test cf_workers +use restate_sdk::{cf_workers::CfWorkerServer, prelude::*}; +use wasm_bindgen_test::*; + +#[restate_sdk::service] +trait MyService { + async fn my_handler() -> HandlerResult<()>; +} + +struct MyServiceImpl; + +impl MyService for MyServiceImpl { + async fn my_handler(&self, _: Context<'_>) -> HandlerResult<()> { + Ok(()) + } +} + +#[wasm_bindgen_test] +fn cf_workerservice_handler() { + let endpoint = Endpoint::builder() + .bind(MyServiceImpl.serve()) + .build(); + + let cf_server = CfWorkerServer::new(endpoint); + let health_check_request = http::Request::builder().uri("/health").body(worker::Body::empty()).unwrap(); + let result =cf_server.call(health_check_request); + assert!(result.is_ok()); + +} From afd836e3d8599ff022ff358f179ed2773d806c3a Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Mon, 30 Jun 2025 00:37:17 -0400 Subject: [PATCH 02/11] included synchronous handler for bidi streams --- Cargo.toml | 4 ++ src/cf_workers.rs | 106 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 100 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3e18f05..644181d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,10 @@ worker = {version = "0.6.0", features=["http"], optional = true } hyper = { version = "1.6", optional = true} hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true } getrandom = { version = "0.3.3", features = ["wasm_js"], optional = true} +futures-util = "0.3.31" +wasm-streams = "0.4.2" +async-stream = "0.3.6" +wasm-bindgen-futures = "0.4" [dev-dependencies] tracing-subscriber = { version = "0.3", features = ["env-filter", "registry"] } diff --git a/src/cf_workers.rs b/src/cf_workers.rs index 61aa936..150d4eb 100644 --- a/src/cf_workers.rs +++ b/src/cf_workers.rs @@ -1,9 +1,17 @@ -use std::str::FromStr; -use http::{HeaderName, HeaderValue, StatusCode}; -use web_sys::{js_sys::Uint8Array, wasm_bindgen::{prelude::Closure, JsCast, JsValue}, ReadableStream, ReadableStreamDefaultController}; +use std::{ops::Deref, str::FromStr}; +use http::{response, HeaderName, HeaderValue, StatusCode}; +use restate_sdk_shared_core::Header; +use tokio::sync::mpsc; +use web_sys::{js_sys::Uint8Array, wasm_bindgen::{prelude::Closure, JsCast, JsValue}, ReadableStream, ReadableStreamDefaultController, ReadableStreamDefaultReader}; +use wasm_bindgen_futures; use worker::*; -use crate::prelude::Endpoint; +use crate::{endpoint::{InputReceiver, OutputSender}, prelude::Endpoint}; + +#[allow(clippy::declare_interior_mutable_const)] +const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server"); +const X_RESTATE_SERVER_VALUE: HeaderValue = + HeaderValue::from_static(concat!("restate-sdk-rust/", env!("CARGO_PKG_VERSION"))); // Convert Bytes to ReadableStream using Web API bindings fn bytes_to_readable_stream(data: bytes::Bytes) -> core::result::Result { @@ -31,6 +39,21 @@ fn bytes_to_readable_stream(data: bytes::Bytes) -> core::result::Result, +) -> response::Builder { + let mut response_builder = http::Response::builder() + .status(status_code) + .header(X_RESTATE_SERVER, X_RESTATE_SERVER_VALUE); + + for header in headers { + response_builder = response_builder.header(header.key.deref(), header.value.deref()); + } + + response_builder +} + /// Http server to expose your Restate services. pub struct CfWorkerServer { endpoint: Endpoint, @@ -48,9 +71,10 @@ impl CfWorkerServer { Self { endpoint } } - pub fn call(&self, req: HttpRequest) -> worker::Result> { + pub async fn call(&self, req: HttpRequest) -> worker::Result> { let headers = req.headers().to_owned(); - let result = self.endpoint.resolve(req.uri().path(), headers); + let (parts, body) = req.into_parts(); + let result = self.endpoint.resolve(parts.uri.path(), headers); if let Ok(response) = result { match response { @@ -69,11 +93,73 @@ impl CfWorkerServer { Ok(http_response) } - crate::endpoint::Response::BidiStream { status_code:_, headers:_, handler:_ } => { - // Cloudflare Workers don't support HTTP 1.1/HTTP 2 bididirectional streams - let http_response = http::Response::builder().status(StatusCode::NOT_IMPLEMENTED).body(worker::Body::empty())?; - // have to use worker::Result not http::Result + crate::endpoint::Response::BidiStream { status_code, headers, handler} => { + + // let body_stream = ReadableStream::from_raw(body.into_inner().unwrap()); + + // let stream = body_stream.into_stream(); + // + + // Read entire request body first to avoid Send issues with WebAssembly pointers + let js_stream: ReadableStream = body.into_inner().unwrap().unchecked_into(); + let reader: ReadableStreamDefaultReader = js_stream.get_reader().unchecked_into(); + + let mut request_body = Vec::new(); + loop { + let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await; + match read_result { + Ok(js_value) => { + let done = js_sys::Reflect::get(&js_value, &JsValue::from_str("done")).unwrap(); + if done.as_bool().unwrap_or(false) { + break; + } + let value = js_sys::Reflect::get(&js_value, &JsValue::from_str("value")).unwrap(); + let uint8_array: Uint8Array = value.unchecked_into(); + let mut bytes = vec![0u8; uint8_array.length() as usize]; + uint8_array.copy_to(&mut bytes); + request_body.extend_from_slice(&bytes); + } + Err(_) => break, + } + } + + // Create a simple stream from the collected bytes + let request_bytes = bytes::Bytes::from(request_body); + let stream = futures_util::stream::once(async move { + Ok::>(request_bytes) + }); + + let input_receiver = InputReceiver::from_stream(stream); + + let (output_tx, output_rx) = mpsc::unbounded_channel(); + let output_sender = OutputSender::from_channel(output_tx); + + // Execute handler and collect output + let _ = handler.handle(input_receiver, output_sender).await; + + // Collect all output chunks + let mut response_body = Vec::new(); + let mut rx = output_rx; + while let Some(chunk) = rx.recv().await { + response_body.extend_from_slice(&chunk); + } + + let readable_stream = bytes_to_readable_stream(bytes::Bytes::from(response_body))?; + let mut http_response = http::Response::builder() + .status(status_code) + .body(Body::new(readable_stream))?; + + for header in headers { + let key = HeaderName::from_str(header.key.as_ref())?; + let value = HeaderValue::from_str(header.value.as_ref())?; + http_response.headers_mut().insert(key, value); + } + Ok(http_response) + // // Cloudflare Workers don't support HTTP 1.1/HTTP 2 bididirectional streams + // let http_response = http::Response::builder().status(StatusCode::IM_A_TEAPOT).body(worker::Body::empty())?; + // // have to use worker::Result not http::Result + // Ok(http_response) }, } } From cad43fa84348c5c754c16f8d0d21f3ee31a396c8 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Mon, 30 Jun 2025 11:26:53 -0400 Subject: [PATCH 03/11] added a with_protocol_mode() flag to endpoint builder to support CLoudflare deployment --- src/endpoint/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs index 15001bf..1540056 100644 --- a/src/endpoint/mod.rs +++ b/src/endpoint/mod.rs @@ -386,6 +386,11 @@ impl Builder { Self::default() } + pub fn with_protocol_mode(mut self, mode: crate::discovery::ProtocolMode) -> Self { + self.discovery.protocol_mode = Some(mode); + self + } + /// Add a [`Service`] to this endpoint. /// /// When using the [`service`](macro@crate::service), [`object`](macro@crate::object) or [`workflow`](macro@crate::workflow) macros, From d42bd39d71f85236a9056ca9cbd2fa911eb81657 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Mon, 30 Jun 2025 12:21:48 -0400 Subject: [PATCH 04/11] clean up src/cf_workers.rs --- src/cf_workers.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/cf_workers.rs b/src/cf_workers.rs index 150d4eb..1ea8612 100644 --- a/src/cf_workers.rs +++ b/src/cf_workers.rs @@ -95,12 +95,10 @@ impl CfWorkerServer { } crate::endpoint::Response::BidiStream { status_code, headers, handler} => { - // let body_stream = ReadableStream::from_raw(body.into_inner().unwrap()); + // Cloudflare Workers don't support HTTP 1.1/HTTP 2 bididirectional streams + // Implenting this as a workaround using existing handler api, expecting a hyper request stream + // Reads entire request/reponse body to proxy data across WASM boundary - // let stream = body_stream.into_stream(); - // - - // Read entire request body first to avoid Send issues with WebAssembly pointers let js_stream: ReadableStream = body.into_inner().unwrap().unchecked_into(); let reader: ReadableStreamDefaultReader = js_stream.get_reader().unchecked_into(); @@ -123,7 +121,7 @@ impl CfWorkerServer { } } - // Create a simple stream from the collected bytes + // Create a simple stream from the collected bytes -- this maps to existing restate sdk stream implementation for hanlders let request_bytes = bytes::Bytes::from(request_body); let stream = futures_util::stream::once(async move { Ok::>(request_bytes) @@ -137,13 +135,14 @@ impl CfWorkerServer { // Execute handler and collect output let _ = handler.handle(input_receiver, output_sender).await; - // Collect all output chunks + // Collect all output chunks to proxy body response across WASM boundary let mut response_body = Vec::new(); let mut rx = output_rx; while let Some(chunk) = rx.recv().await { response_body.extend_from_slice(&chunk); } + // Build HTTP Response from bytes let readable_stream = bytes_to_readable_stream(bytes::Bytes::from(response_body))?; let mut http_response = http::Response::builder() .status(status_code) @@ -156,10 +155,6 @@ impl CfWorkerServer { } Ok(http_response) - // // Cloudflare Workers don't support HTTP 1.1/HTTP 2 bididirectional streams - // let http_response = http::Response::builder().status(StatusCode::IM_A_TEAPOT).body(worker::Body::empty())?; - // // have to use worker::Result not http::Result - // Ok(http_response) }, } } From 2e2b307d66ea73f9272817fce26c7e611c810ace Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Mon, 30 Jun 2025 12:22:35 -0400 Subject: [PATCH 05/11] add example code for cf_workers --- Cargo.toml | 5 +++++ examples/cf_workers.rs | 28 ++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 examples/cf_workers.rs diff --git a/Cargo.toml b/Cargo.toml index 644181d..7a9a942 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,11 @@ name = "schema" path = "examples/schema.rs" required-features = ["schemars"] +[[example]] +name = "cf_workers" +path = "examples/cf_workers.rs" +required-features = ["cf_workers"] + [lib] crate-type = ["cdylib", "rlib"] diff --git a/examples/cf_workers.rs b/examples/cf_workers.rs new file mode 100644 index 0000000..7e6e644 --- /dev/null +++ b/examples/cf_workers.rs @@ -0,0 +1,28 @@ +#![cfg(target_family = "wasm")] + +use restate_sdk::prelude::*; + +#[restate_sdk::service] +trait MyService { + async fn my_handler() -> HandlerResult<()>; +} + +struct MyServiceImpl; + +impl MyService for MyServiceImpl { + async fn my_handler(&self, _: Context<'_>) -> HandlerResult<()> { + Ok(()) + } +} + +#[worker::event(fetch)] +pub async fn main( req:worker::HttpRequest, _env: worker::Env, _ctx: worker::Context) -> worker::Result> { + let endpoint = Endpoint::builder() + .with_protocol_mode(restate_sdk::discovery::ProtocolMode::RequestResponse) // Cloud + .bind(MyServiceImpl.serve()) + .build(); + + let cf_worker = CfWorkerServer::new(endpoint); + + return cf_worker.call(req).await; +} From 58b60058ce89f874f1661db1509cc0b521eb610d Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Mon, 30 Jun 2025 12:31:53 -0400 Subject: [PATCH 06/11] dropping dead code --- src/cf_workers.rs | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/src/cf_workers.rs b/src/cf_workers.rs index 1ea8612..09783ca 100644 --- a/src/cf_workers.rs +++ b/src/cf_workers.rs @@ -3,7 +3,8 @@ use std::{ops::Deref, str::FromStr}; use http::{response, HeaderName, HeaderValue, StatusCode}; use restate_sdk_shared_core::Header; use tokio::sync::mpsc; -use web_sys::{js_sys::Uint8Array, wasm_bindgen::{prelude::Closure, JsCast, JsValue}, ReadableStream, ReadableStreamDefaultController, ReadableStreamDefaultReader}; +use web_sys::{js_sys::Uint8Array, wasm_bindgen::{prelude::Closure, JsCast, JsValue}, + ReadableStream, ReadableStreamDefaultController, ReadableStreamDefaultReader}; use wasm_bindgen_futures; use worker::*; use crate::{endpoint::{InputReceiver, OutputSender}, prelude::Endpoint}; @@ -39,22 +40,7 @@ fn bytes_to_readable_stream(data: bytes::Bytes) -> core::result::Result, -) -> response::Builder { - let mut response_builder = http::Response::builder() - .status(status_code) - .header(X_RESTATE_SERVER, X_RESTATE_SERVER_VALUE); - - for header in headers { - response_builder = response_builder.header(header.key.deref(), header.value.deref()); - } - - response_builder -} - -/// Http server to expose your Restate services. +/// Cloudflare Worker server to expose Restate services. pub struct CfWorkerServer { endpoint: Endpoint, } @@ -66,7 +52,7 @@ impl From for CfWorkerServer { } impl CfWorkerServer { - /// Create new [`HttpServer`] from an [`Endpoint`]. + pub fn new(endpoint: Endpoint) -> Self { Self { endpoint } } From 238273c44104efce5e4f1affae5c4909b5cc7bc6 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Mon, 30 Jun 2025 12:32:26 -0400 Subject: [PATCH 07/11] updating test to new call response --- tests/cf_workers.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/cf_workers.rs b/tests/cf_workers.rs index 3b1f01d..0c58e7a 100644 --- a/tests/cf_workers.rs +++ b/tests/cf_workers.rs @@ -27,7 +27,5 @@ fn cf_workerservice_handler() { let cf_server = CfWorkerServer::new(endpoint); let health_check_request = http::Request::builder().uri("/health").body(worker::Body::empty()).unwrap(); - let result =cf_server.call(health_check_request); - assert!(result.is_ok()); - + let result = cf_server.call(health_check_request); } From 275c4fd44ae5dbbe25662b85911f2e6fd533f6dd Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Mon, 30 Jun 2025 12:36:43 -0400 Subject: [PATCH 08/11] cleaning up var names for request --- src/cf_workers.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cf_workers.rs b/src/cf_workers.rs index 09783ca..b03ec67 100644 --- a/src/cf_workers.rs +++ b/src/cf_workers.rs @@ -59,7 +59,7 @@ impl CfWorkerServer { pub async fn call(&self, req: HttpRequest) -> worker::Result> { let headers = req.headers().to_owned(); - let (parts, body) = req.into_parts(); + let (parts, request_body) = req.into_parts(); let result = self.endpoint.resolve(parts.uri.path(), headers); if let Ok(response) = result { @@ -85,7 +85,7 @@ impl CfWorkerServer { // Implenting this as a workaround using existing handler api, expecting a hyper request stream // Reads entire request/reponse body to proxy data across WASM boundary - let js_stream: ReadableStream = body.into_inner().unwrap().unchecked_into(); + let js_stream: ReadableStream = request_body.into_inner().unwrap().unchecked_into(); let reader: ReadableStreamDefaultReader = js_stream.get_reader().unchecked_into(); let mut request_body = Vec::new(); From e47eaf5ad00979de42063bcd2215dfe74b9c28a4 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Mon, 30 Jun 2025 12:39:22 -0400 Subject: [PATCH 09/11] clippy cleanup / remove unused imports --- src/cf_workers.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/cf_workers.rs b/src/cf_workers.rs index b03ec67..1aba122 100644 --- a/src/cf_workers.rs +++ b/src/cf_workers.rs @@ -1,7 +1,6 @@ -use std::{ops::Deref, str::FromStr}; -use http::{response, HeaderName, HeaderValue, StatusCode}; -use restate_sdk_shared_core::Header; +use std::str::FromStr; +use http::{HeaderName, HeaderValue, StatusCode}; use tokio::sync::mpsc; use web_sys::{js_sys::Uint8Array, wasm_bindgen::{prelude::Closure, JsCast, JsValue}, ReadableStream, ReadableStreamDefaultController, ReadableStreamDefaultReader}; @@ -9,11 +8,6 @@ use wasm_bindgen_futures; use worker::*; use crate::{endpoint::{InputReceiver, OutputSender}, prelude::Endpoint}; -#[allow(clippy::declare_interior_mutable_const)] -const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server"); -const X_RESTATE_SERVER_VALUE: HeaderValue = - HeaderValue::from_static(concat!("restate-sdk-rust/", env!("CARGO_PKG_VERSION"))); - // Convert Bytes to ReadableStream using Web API bindings fn bytes_to_readable_stream(data: bytes::Bytes) -> core::result::Result { let underlying_source = js_sys::Object::new(); From a17ad4c148c4090db7bfe80acad92646688a56e7 Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Mon, 30 Jun 2025 13:01:54 -0400 Subject: [PATCH 10/11] add comment to cf_worker example to explain build process --- examples/cf_workers.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/examples/cf_workers.rs b/examples/cf_workers.rs index 7e6e644..8e56e0d 100644 --- a/examples/cf_workers.rs +++ b/examples/cf_workers.rs @@ -1,5 +1,17 @@ #![cfg(target_family = "wasm")] +// this is an example of a Rust file that uses the restate-sdk crate to create a Cloudflare Worker service. +// this code needs to be compiled to WebAssembly using the wrangler dev tools with extra rust flags to enable wasm support +// +// for local development use this command: +// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' npx wrangler dev +// +// or push to production using this command: +// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' npx wrangler deploy +// +// The Cloudflare Worker automated build pipeline doesn't currently support this code due to missing clang binaries +// + use restate_sdk::prelude::*; #[restate_sdk::service] From 49493de1a10c50e6663778116bc90d6897b92bcd Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Mon, 30 Jun 2025 13:57:41 -0400 Subject: [PATCH 11/11] updating comments --- examples/cf_workers.rs | 2 +- src/cf_workers.rs | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/examples/cf_workers.rs b/examples/cf_workers.rs index 8e56e0d..08ffcb2 100644 --- a/examples/cf_workers.rs +++ b/examples/cf_workers.rs @@ -30,7 +30,7 @@ impl MyService for MyServiceImpl { #[worker::event(fetch)] pub async fn main( req:worker::HttpRequest, _env: worker::Env, _ctx: worker::Context) -> worker::Result> { let endpoint = Endpoint::builder() - .with_protocol_mode(restate_sdk::discovery::ProtocolMode::RequestResponse) // Cloud + .with_protocol_mode(restate_sdk::discovery::ProtocolMode::RequestResponse) // Cloudflare Workers don't support bidi streams .bind(MyServiceImpl.serve()) .build(); diff --git a/src/cf_workers.rs b/src/cf_workers.rs index 1aba122..b6e2027 100644 --- a/src/cf_workers.rs +++ b/src/cf_workers.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use http::{HeaderName, HeaderValue, StatusCode}; use tokio::sync::mpsc; -use web_sys::{js_sys::Uint8Array, wasm_bindgen::{prelude::Closure, JsCast, JsValue}, +use web_sys::{js_sys::Uint8Array, Reflect, Object, wasm_bindgen::{prelude::Closure, JsCast, JsValue}, ReadableStream, ReadableStreamDefaultController, ReadableStreamDefaultReader}; use wasm_bindgen_futures; use worker::*; @@ -10,7 +10,7 @@ use crate::{endpoint::{InputReceiver, OutputSender}, prelude::Endpoint}; // Convert Bytes to ReadableStream using Web API bindings fn bytes_to_readable_stream(data: bytes::Bytes) -> core::result::Result { - let underlying_source = js_sys::Object::new(); + let underlying_source = Object::new(); let start_closure = Closure::wrap(Box::new(move |controller: ReadableStreamDefaultController| { // Convert bytes to Uint8Array @@ -23,7 +23,7 @@ fn bytes_to_readable_stream(data: bytes::Bytes) -> core::result::Result); - js_sys::Reflect::set( + Reflect::set( &underlying_source, &JsValue::from_str("start"), start_closure.as_ref().unchecked_ref(), @@ -60,6 +60,7 @@ impl CfWorkerServer { match response { crate::endpoint::Response::ReplyNow { status_code, headers, body } => { + // convert outbound body data from a buffer into a readable stream let readable_stream = bytes_to_readable_stream(body)?; let mut http_response = http::Response::builder() .status(status_code) @@ -82,6 +83,8 @@ impl CfWorkerServer { let js_stream: ReadableStream = request_body.into_inner().unwrap().unchecked_into(); let reader: ReadableStreamDefaultReader = js_stream.get_reader().unchecked_into(); + + // Drain the inbound Request API body data from a Stream API ReadableStream into a buffer let mut request_body = Vec::new(); loop { let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await; @@ -101,7 +104,7 @@ impl CfWorkerServer { } } - // Create a simple stream from the collected bytes -- this maps to existing restate sdk stream implementation for hanlders + // Create a Rust stream from the collected bytes -- this maps to existing restate sdk stream implementation for hanlders let request_bytes = bytes::Bytes::from(request_body); let stream = futures_util::stream::once(async move { Ok::>(request_bytes) @@ -122,7 +125,7 @@ impl CfWorkerServer { response_body.extend_from_slice(&chunk); } - // Build HTTP Response from bytes + // Build HTTP Response-- converting outbound buffer into readable stream let readable_stream = bytes_to_readable_stream(bytes::Bytes::from(response_body))?; let mut http_response = http::Response::builder() .status(status_code)