Skip to content

Commit 8e22ff8

Browse files
wasi-http: use UnsyncBoxBody to support axum (#11941)
The Body type used by the axum web application framework is not Sync, and not compatible with http_body_util::combinations::BoxBody. Luckily, an alternative UnsyncBoxBody exists. Replace all wasi-http p3 usages of BoxBody with UnsyncBoxBody. Some wasi-http p2 code refers directly to hyper types which use the sync BoxBody. Do not modify those.
1 parent 311bf0f commit 8e22ff8

File tree

7 files changed

+34
-34
lines changed

7 files changed

+34
-34
lines changed

crates/wasi-http/src/p3/body.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use core::pin::Pin;
88
use core::task::{Context, Poll, ready};
99
use http::HeaderMap;
1010
use http_body::Body as _;
11-
use http_body_util::combinators::BoxBody;
11+
use http_body_util::combinators::UnsyncBoxBody;
1212
use std::any::{Any, TypeId};
1313
use std::io::Cursor;
1414
use std::sync::Arc;
@@ -34,7 +34,7 @@ pub(crate) enum Body {
3434
/// Body constructed by the host.
3535
Host {
3636
/// The [`http_body::Body`]
37-
body: BoxBody<Bytes, ErrorCode>,
37+
body: UnsyncBoxBody<Bytes, ErrorCode>,
3838
/// Channel, on which transmission result will be written
3939
result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,
4040
},
@@ -441,7 +441,7 @@ where
441441

442442
/// [StreamProducer] implementation for bodies originating in the host.
443443
pub(crate) struct HostBodyStreamProducer<T> {
444-
pub(crate) body: BoxBody<Bytes, ErrorCode>,
444+
pub(crate) body: UnsyncBoxBody<Bytes, ErrorCode>,
445445
trailers: Option<oneshot::Sender<Result<Option<Resource<Trailers>>, ErrorCode>>>,
446446
getter: fn(&mut T) -> WasiHttpCtxView<'_>,
447447
}

crates/wasi-http/src/p3/host/handler.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ impl HostWithStore for WasiHttp {
240240
getter,
241241
)
242242
.with_state(io_task_rx)
243-
.boxed(),
243+
.boxed_unsync(),
244244
Body::Host { body, result_tx } => {
245245
if let Some(limit) = content_length {
246246
let (http_result_tx, http_result_rx) = oneshot::channel();
@@ -256,10 +256,10 @@ impl HostWithStore for WasiHttp {
256256
ErrorCode::HttpRequestBodySize,
257257
)
258258
.with_state(io_task_rx)
259-
.boxed()
259+
.boxed_unsync()
260260
} else {
261261
_ = result_tx.send(Box::new(io_task_result(io_result_rx)));
262-
body.with_state(io_task_rx).boxed()
262+
body.with_state(io_task_rx).boxed_unsync()
263263
}
264264
}
265265
};
@@ -331,7 +331,7 @@ impl HostWithStore for WasiHttp {
331331
let io = Arc::new(AbortOnDropJoinHandle(io));
332332
_ = io_result_tx.send((Arc::clone(&io), rx));
333333
_ = io_task_tx.send(Arc::clone(&io));
334-
body.with_state(io).boxed()
334+
body.with_state(io).boxed_unsync()
335335
}
336336
};
337337
let res = Response {

crates/wasi-http/src/p3/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use bytes::Bytes;
2828
use core::ops::Deref;
2929
use http::HeaderName;
3030
use http::uri::Scheme;
31-
use http_body_util::combinators::BoxBody;
31+
use http_body_util::combinators::UnsyncBoxBody;
3232
use std::sync::Arc;
3333
use wasmtime::component::{HasData, Linker, ResourceTable};
3434
use wasmtime_wasi::TrappableError;
@@ -95,13 +95,13 @@ pub trait WasiHttpCtx: Send {
9595
#[cfg(feature = "default-send-request")]
9696
fn send_request(
9797
&mut self,
98-
request: http::Request<BoxBody<Bytes, ErrorCode>>,
98+
request: http::Request<UnsyncBoxBody<Bytes, ErrorCode>>,
9999
options: Option<RequestOptions>,
100100
fut: Box<dyn Future<Output = Result<(), ErrorCode>> + Send>,
101101
) -> Box<
102102
dyn Future<
103103
Output = HttpResult<(
104-
http::Response<BoxBody<Bytes, ErrorCode>>,
104+
http::Response<UnsyncBoxBody<Bytes, ErrorCode>>,
105105
Box<dyn Future<Output = Result<(), ErrorCode>> + Send>,
106106
)>,
107107
> + Send,
@@ -112,7 +112,7 @@ pub trait WasiHttpCtx: Send {
112112

113113
let (res, io) = default_send_request(request, options).await?;
114114
Ok((
115-
res.map(BodyExt::boxed),
115+
res.map(BodyExt::boxed_unsync),
116116
Box::new(io) as Box<dyn Future<Output = _> + Send>,
117117
))
118118
})
@@ -137,13 +137,13 @@ pub trait WasiHttpCtx: Send {
137137
#[cfg(not(feature = "default-send-request"))]
138138
fn send_request(
139139
&mut self,
140-
request: http::Request<BoxBody<Bytes, ErrorCode>>,
140+
request: http::Request<UnsyncBoxBody<Bytes, ErrorCode>>,
141141
options: Option<RequestOptions>,
142142
fut: Box<dyn Future<Output = Result<(), ErrorCode>> + Send>,
143143
) -> Box<
144144
dyn Future<
145145
Output = HttpResult<(
146-
http::Response<BoxBody<Bytes, ErrorCode>>,
146+
http::Response<UnsyncBoxBody<Bytes, ErrorCode>>,
147147
Box<dyn Future<Output = Result<(), ErrorCode>> + Send>,
148148
)>,
149149
> + Send,

crates/wasi-http/src/p3/request.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use core::time::Duration;
55
use http::uri::{Authority, PathAndQuery, Scheme};
66
use http::{HeaderMap, Method};
77
use http_body_util::BodyExt as _;
8-
use http_body_util::combinators::BoxBody;
8+
use http_body_util::combinators::UnsyncBoxBody;
99
use std::sync::Arc;
1010
use tokio::sync::oneshot;
1111

@@ -52,7 +52,7 @@ impl Request {
5252
path_with_query: Option<PathAndQuery>,
5353
headers: impl Into<Arc<HeaderMap>>,
5454
options: Option<Arc<RequestOptions>>,
55-
body: impl Into<BoxBody<Bytes, ErrorCode>>,
55+
body: impl Into<UnsyncBoxBody<Bytes, ErrorCode>>,
5656
) -> (
5757
Self,
5858
impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
@@ -91,7 +91,7 @@ impl Request {
9191
impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
9292
)
9393
where
94-
T: http_body::Body<Data = Bytes> + Send + Sync + 'static,
94+
T: http_body::Body<Data = Bytes> + Send + 'static,
9595
T::Error: Into<ErrorCode>,
9696
{
9797
let (
@@ -116,7 +116,7 @@ impl Request {
116116
path_and_query,
117117
headers,
118118
None,
119-
body.map_err(Into::into).boxed(),
119+
body.map_err(Into::into).boxed_unsync(),
120120
)
121121
}
122122
}

crates/wasi-http/src/p3/response.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use anyhow::Context as _;
66
use bytes::Bytes;
77
use http::{HeaderMap, StatusCode};
88
use http_body_util::BodyExt as _;
9-
use http_body_util::combinators::BoxBody;
9+
use http_body_util::combinators::UnsyncBoxBody;
1010
use std::sync::Arc;
1111
use wasmtime::AsContextMut;
1212

@@ -47,7 +47,7 @@ impl Response {
4747
self,
4848
store: impl AsContextMut<Data = T>,
4949
fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
50-
) -> wasmtime::Result<http::Response<BoxBody<Bytes, ErrorCode>>> {
50+
) -> wasmtime::Result<http::Response<UnsyncBoxBody<Bytes, ErrorCode>>> {
5151
self.into_http_with_getter(store, fut, T::http)
5252
}
5353

@@ -58,7 +58,7 @@ impl Response {
5858
store: impl AsContextMut<Data = T>,
5959
fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
6060
getter: fn(&mut T) -> WasiHttpCtxView<'_>,
61-
) -> wasmtime::Result<http::Response<BoxBody<Bytes, ErrorCode>>> {
61+
) -> wasmtime::Result<http::Response<UnsyncBoxBody<Bytes, ErrorCode>>> {
6262
let res = http::Response::try_from(self)?;
6363
let (res, body) = res.into_parts();
6464
let body = match body {
@@ -80,7 +80,7 @@ impl Response {
8080
ErrorCode::HttpResponseBodySize,
8181
getter,
8282
)
83-
.boxed()
83+
.boxed_unsync()
8484
}
8585
Body::Host { body, result_tx } => {
8686
_ = result_tx.send(Box::new(fut));

crates/wasi-http/tests/all/p3/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use futures::SinkExt;
88
use futures::channel::oneshot;
99
use http::HeaderValue;
1010
use http_body::Body;
11-
use http_body_util::{BodyExt as _, Collected, Empty, combinators::BoxBody};
11+
use http_body_util::{BodyExt as _, Collected, Empty, combinators::UnsyncBoxBody};
1212
use std::io::Write;
1313
use std::path::Path;
1414
use test_programs_artifacts::*;
@@ -29,7 +29,7 @@ use wasmtime_wasi_http::types::DEFAULT_FORBIDDEN_HEADERS;
2929
foreach_p3_http!(assert_test_exists);
3030

3131
struct TestHttpCtx {
32-
request_body_tx: Option<oneshot::Sender<BoxBody<Bytes, ErrorCode>>>,
32+
request_body_tx: Option<oneshot::Sender<UnsyncBoxBody<Bytes, ErrorCode>>>,
3333
}
3434

3535
impl WasiHttpCtx for TestHttpCtx {
@@ -39,14 +39,14 @@ impl WasiHttpCtx for TestHttpCtx {
3939

4040
fn send_request(
4141
&mut self,
42-
request: http::Request<BoxBody<Bytes, ErrorCode>>,
42+
request: http::Request<UnsyncBoxBody<Bytes, ErrorCode>>,
4343
options: Option<RequestOptions>,
4444
fut: Box<dyn Future<Output = Result<(), ErrorCode>> + Send>,
4545
) -> Box<
4646
dyn Future<
4747
Output = Result<
4848
(
49-
http::Response<BoxBody<Bytes, ErrorCode>>,
49+
http::Response<UnsyncBoxBody<Bytes, ErrorCode>>,
5050
Box<dyn Future<Output = Result<(), ErrorCode>> + Send>,
5151
),
5252
TrappableError<ErrorCode>,
@@ -72,7 +72,7 @@ impl WasiHttpCtx for TestHttpCtx {
7272

7373
let (res, io) = p3::default_send_request(request, options).await?;
7474
Ok((
75-
res.map(BodyExt::boxed),
75+
res.map(BodyExt::boxed_unsync),
7676
Box::new(io) as Box<dyn Future<Output = _> + Send>,
7777
))
7878
})
@@ -87,7 +87,7 @@ struct Ctx {
8787
}
8888

8989
impl Ctx {
90-
fn new(request_body_tx: oneshot::Sender<BoxBody<Bytes, ErrorCode>>) -> Self {
90+
fn new(request_body_tx: oneshot::Sender<UnsyncBoxBody<Bytes, ErrorCode>>) -> Self {
9191
Self {
9292
table: ResourceTable::default(),
9393
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
@@ -150,7 +150,7 @@ async fn run_cli(path: &str, server: &Server) -> anyhow::Result<()> {
150150
async fn run_http<E: Into<ErrorCode> + 'static>(
151151
component_filename: &str,
152152
req: http::Request<impl Body<Data = Bytes, Error = E> + Send + Sync + 'static>,
153-
request_body_tx: oneshot::Sender<BoxBody<Bytes, ErrorCode>>,
153+
request_body_tx: oneshot::Sender<UnsyncBoxBody<Bytes, ErrorCode>>,
154154
) -> anyhow::Result<Result<http::Response<Collected<Bytes>>, Option<ErrorCode>>> {
155155
let engine = test_programs_artifacts::engine(|config| {
156156
config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);

src/commands/serve.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use clap::Parser;
55
use futures::future::FutureExt;
66
use http::{Response, StatusCode};
77
use http_body_util::BodyExt as _;
8-
use http_body_util::combinators::BoxBody;
8+
use http_body_util::combinators::UnsyncBoxBody;
99
use std::convert::Infallible;
1010
use std::net::SocketAddr;
1111
use std::pin::Pin;
@@ -568,7 +568,7 @@ impl ServeCommand {
568568
.body(
569569
Full::new(bytes::Bytes::from(error_html))
570570
.map_err(|_| unreachable!())
571-
.boxed(),
571+
.boxed_unsync(),
572572
)
573573
.unwrap())
574574
}
@@ -826,7 +826,7 @@ type Request = hyper::Request<hyper::body::Incoming>;
826826
async fn handle_request(
827827
handler: ProxyHandler<HostHandlerState>,
828828
req: Request,
829-
) -> Result<hyper::Response<BoxBody<Bytes, anyhow::Error>>> {
829+
) -> Result<hyper::Response<UnsyncBoxBody<Bytes, anyhow::Error>>> {
830830
use tokio::sync::oneshot;
831831

832832
let req_id = handler.next_req_id();
@@ -847,7 +847,7 @@ async fn handle_request(
847847
hyper::Response<wasmtime_wasi_http::body::HyperOutgoingBody>,
848848
p2::http::types::ErrorCode,
849849
>;
850-
type P3Response = hyper::Response<BoxBody<Bytes, anyhow::Error>>;
850+
type P3Response = hyper::Response<UnsyncBoxBody<Bytes, anyhow::Error>>;
851851

852852
enum Sender {
853853
P2(oneshot::Sender<P2Response>),
@@ -908,7 +908,7 @@ async fn handle_request(
908908
let (res, task) = proxy.handle(store, request).await??;
909909
let res = store
910910
.with(|mut store| res.into_http(&mut store, request_io_result))?;
911-
_ = tx.send(res.map(|body| body.map_err(|e| e.into()).boxed()));
911+
_ = tx.send(res.map(|body| body.map_err(|e| e.into()).boxed_unsync()));
912912

913913
// Wait for the task to finish.
914914
task.block(store).await;
@@ -930,7 +930,7 @@ async fn handle_request(
930930
.await
931931
.context("guest never invoked `response-outparam::set` method")?
932932
.map_err(|e| anyhow::Error::from(e))?
933-
.map(|body| body.map_err(|e| e.into()).boxed()),
933+
.map(|body| body.map_err(|e| e.into()).boxed_unsync()),
934934
Receiver::P3(rx) => rx.await?,
935935
})
936936
}

0 commit comments

Comments
 (0)