Skip to content
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Generated by Cargo
# will have compiled files and executables
/target/
/node_modules/

# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Expand All @@ -13,4 +14,4 @@
.idea
*.iml

Cargo.lock
Cargo.lock
27 changes: 23 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@ name = "schema"
path = "examples/schema.rs"
required-features = ["schemars"]

[[example]]
name = "cf_workers"
path = "examples/cf_workers.rs"
required-features = ["cf_workers"]

[lib]
crate-type = ["cdylib", "rlib"]

[features]
default = ["http_server", "rand", "uuid", "tracing-span-filter"]
cf_workers = ["worker", "web-sys", "http-body-util", "getrandom"]
hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"]
http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"]
tracing-span-filter = ["dep:tracing-subscriber"]
Expand All @@ -28,8 +37,6 @@ bytes = "1.10"
futures = "0.3"
http = "1.3"
http-body-util = { version = "0.1", optional = true }
hyper = { version = "1.6", optional = true}
hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true }
pin-project-lite = "0.2"
rand = { version = "0.9", optional = true }
regress = "0.10"
Expand All @@ -43,14 +50,26 @@ tokio = { version = "1.44", default-features = false, features = ["sync"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["registry"], optional = true }
uuid = { version = "1.16.0", optional = true }
web-sys = {version = "0.3.70", optional = true}
worker = {version = "0.6.0", features=["http"], optional = true }
hyper = { version = "1.6", optional = true}
hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true }
getrandom = { version = "0.3.3", features = ["wasm_js"], optional = true}
futures-util = "0.3.31"
wasm-streams = "0.4.2"
async-stream = "0.3.6"
wasm-bindgen-futures = "0.4"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "registry"] }
trybuild = "1.0"
reqwest = { version = "0.12", features = ["json"] }
rand = "0.9"
schemars = "1.0.0-alpha.17"
wasm-bindgen-test = "0.3"

[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
reqwest = { version = "0.12", features = ["json"] }
tokio = { version = "1", features = ["full"] }

[build-dependencies]
jsonptr = "0.5.1"
Expand Down
40 changes: 40 additions & 0 deletions examples/cf_workers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#![cfg(target_family = "wasm")]

// this is an example of a Rust file that uses the restate-sdk crate to create a Cloudflare Worker service.
// this code needs to be compiled to WebAssembly using the wrangler dev tools with extra rust flags to enable wasm support
//
// for local development use this command:
// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' npx wrangler dev
//
// or push to production using this command:
// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' npx wrangler deploy
//
// The Cloudflare Worker automated build pipeline doesn't currently support this code due to missing clang binaries
//

use restate_sdk::prelude::*;

#[restate_sdk::service]
trait MyService {
async fn my_handler() -> HandlerResult<()>;
}

struct MyServiceImpl;

impl MyService for MyServiceImpl {
async fn my_handler(&self, _: Context<'_>) -> HandlerResult<()> {
Ok(())
}
}

Check warning on line 28 in examples/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/examples/cf_workers.rs

#[worker::event(fetch)]
pub async fn main( req:worker::HttpRequest, _env: worker::Env, _ctx: worker::Context) -> worker::Result<http::Response<worker::Body>> {
let endpoint = Endpoint::builder()
.with_protocol_mode(restate_sdk::discovery::ProtocolMode::RequestResponse) // Cloudflare Workers don't support bidi streams
.bind(MyServiceImpl.serve())
.build();

let cf_worker = CfWorkerServer::new(endpoint);

return cf_worker.call(req).await;
}
149 changes: 149 additions & 0 deletions src/cf_workers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@

Check warning on line 1 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
use std::str::FromStr;
use http::{HeaderName, HeaderValue, StatusCode};
use tokio::sync::mpsc;
use web_sys::{js_sys::Uint8Array, Reflect, Object, wasm_bindgen::{prelude::Closure, JsCast, JsValue},
ReadableStream, ReadableStreamDefaultController, ReadableStreamDefaultReader};
use wasm_bindgen_futures;
use worker::*;
use crate::{endpoint::{InputReceiver, OutputSender}, prelude::Endpoint};

// Convert Bytes to ReadableStream using Web API bindings
fn bytes_to_readable_stream(data: bytes::Bytes) -> core::result::Result<ReadableStream, JsValue> {
let underlying_source = Object::new();

let start_closure = Closure::wrap(Box::new(move |controller: ReadableStreamDefaultController| {
// Convert bytes to Uint8Array
let uint8_array = Uint8Array::new_with_length(data.len() as u32);
uint8_array.copy_from(&data[..]);

// Enqueue the data
let _ = controller.enqueue_with_chunk(&uint8_array);
let _ = controller.close();

}) as Box<dyn FnMut(ReadableStreamDefaultController)>);

Reflect::set(
&underlying_source,
&JsValue::from_str("start"),
start_closure.as_ref().unchecked_ref(),
)?;

start_closure.forget(); // Prevent cleanup

ReadableStream::new_with_underlying_source(&underlying_source)
}

/// Cloudflare Worker server to expose Restate services.
pub struct CfWorkerServer {
endpoint: Endpoint,
}

impl From<Endpoint> for CfWorkerServer {
fn from(endpoint: Endpoint) -> Self {
Self { endpoint }
}
}

Check warning on line 46 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs

impl CfWorkerServer {

pub fn new(endpoint: Endpoint) -> Self {
Self { endpoint }
}

pub async fn call(&self, req: HttpRequest) -> worker::Result<http::Response<worker::Body>> {
let headers = req.headers().to_owned();
let (parts, request_body) = req.into_parts();
let result = self.endpoint.resolve(parts.uri.path(), headers);

Check warning on line 58 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
if let Ok(response) = result {
match response {
crate::endpoint::Response::ReplyNow { status_code, headers, body } => {

// convert outbound body data from a buffer into a readable stream
let readable_stream = bytes_to_readable_stream(body)?;
let mut http_response = http::Response::builder()
.status(status_code)
.body(Body::new(readable_stream))?;

for header in headers {
let key = HeaderName::from_str(header.key.as_ref())?;
let value = HeaderValue::from_str(header.value.as_ref())?;
http_response.headers_mut().insert(key, value);
}

Check warning on line 74 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
Ok(http_response)
}
crate::endpoint::Response::BidiStream { status_code, headers, handler} => {

// Cloudflare Workers don't support HTTP 1.1/HTTP 2 bididirectional streams
// Implenting this as a workaround using existing handler api, expecting a hyper request stream
// Reads entire request/reponse body to proxy data across WASM boundary

Check warning on line 82 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
let js_stream: ReadableStream = request_body.into_inner().unwrap().unchecked_into();
let reader: ReadableStreamDefaultReader = js_stream.get_reader().unchecked_into();


// Drain the inbound Request API body data from a Stream API ReadableStream into a buffer
let mut request_body = Vec::new();
loop {
let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await;

Check warning on line 90 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
match read_result {
Ok(js_value) => {
let done = js_sys::Reflect::get(&js_value, &JsValue::from_str("done")).unwrap();
if done.as_bool().unwrap_or(false) {
break;
}
let value = js_sys::Reflect::get(&js_value, &JsValue::from_str("value")).unwrap();

Check warning on line 97 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs
let uint8_array: Uint8Array = value.unchecked_into();
let mut bytes = vec![0u8; uint8_array.length() as usize];
uint8_array.copy_to(&mut bytes);
request_body.extend_from_slice(&bytes);
}
Err(_) => break,
}
}

// Create a Rust stream from the collected bytes -- this maps to existing restate sdk stream implementation for hanlders
let request_bytes = bytes::Bytes::from(request_body);
let stream = futures_util::stream::once(async move {
Ok::<bytes::Bytes, Box<dyn std::error::Error + Send + Sync>>(request_bytes)
});

let input_receiver = InputReceiver::from_stream(stream);

let (output_tx, output_rx) = mpsc::unbounded_channel();
let output_sender = OutputSender::from_channel(output_tx);

// Execute handler and collect output
let _ = handler.handle(input_receiver, output_sender).await;

// Collect all output chunks to proxy body response across WASM boundary
let mut response_body = Vec::new();
let mut rx = output_rx;
while let Some(chunk) = rx.recv().await {
response_body.extend_from_slice(&chunk);
}

Check warning on line 126 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs

// Build HTTP Response-- converting outbound buffer into readable stream
let readable_stream = bytes_to_readable_stream(bytes::Bytes::from(response_body))?;
let mut http_response = http::Response::builder()
.status(status_code)
.body(Body::new(readable_stream))?;

for header in headers {
let key = HeaderName::from_str(header.key.as_ref())?;
let value = HeaderValue::from_str(header.value.as_ref())?;
http_response.headers_mut().insert(key, value);
}

Check warning on line 138 in src/cf_workers.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

Diff in /home/runner/work/sdk-rust/sdk-rust/src/cf_workers.rs

Ok(http_response)
},
}
}
else {
let http_response = http::Response::builder().status(StatusCode::BAD_REQUEST).body(worker::Body::empty())?;
Ok(http_response)
}
}
}
5 changes: 5 additions & 0 deletions src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ impl Builder {
Self::default()
}

pub fn with_protocol_mode(mut self, mode: crate::discovery::ProtocolMode) -> Self {
self.discovery.protocol_mode = Some(mode);
self
}

/// Add a [`Service`] to this endpoint.
///
/// When using the [`service`](macro@crate::service), [`object`](macro@crate::object) or [`workflow`](macro@crate::workflow) macros,
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ pub mod filter;
pub mod http_server;
#[cfg(feature = "hyper")]
pub mod hyper;
#[cfg(feature = "cf_workers")]
pub mod cf_workers;
pub mod serde;

/// Entry-point macro to define a Restate [Service](https://docs.restate.dev/concepts/services#services-1).
Expand Down Expand Up @@ -509,6 +511,9 @@ pub mod prelude {
#[cfg(feature = "http_server")]
pub use crate::http_server::HttpServer;

#[cfg(feature = "cf_workers")]
pub use crate::cf_workers::CfWorkerServer;

pub use crate::context::{
CallFuture, Context, ContextAwakeables, ContextClient, ContextPromises, ContextReadState,
ContextSideEffects, ContextTimers, ContextWriteState, HeaderMap, InvocationHandle,
Expand Down
31 changes: 31 additions & 0 deletions tests/cf_workers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#![cfg(target_family = "wasm")]

// test requires wasm-pack / wasm-bindgen-test framework
// use following command to execute test:
// RUSTFLAGS='--cfg getrandom_backend="wasm_js"' wasm-pack test --node -- --no-default-features --features cf_workers --test cf_workers
use restate_sdk::{cf_workers::CfWorkerServer, prelude::*};
use wasm_bindgen_test::*;

#[restate_sdk::service]
trait MyService {
async fn my_handler() -> HandlerResult<()>;
}

struct MyServiceImpl;

impl MyService for MyServiceImpl {
async fn my_handler(&self, _: Context<'_>) -> HandlerResult<()> {
Ok(())
}
}

#[wasm_bindgen_test]
fn cf_workerservice_handler() {
let endpoint = Endpoint::builder()
.bind(MyServiceImpl.serve())
.build();

let cf_server = CfWorkerServer::new(endpoint);
let health_check_request = http::Request::builder().uri("/health").body(worker::Body::empty()).unwrap();
let result = cf_server.call(health_check_request);
}
Loading