Skip to content

Commit 149da63

Browse files
committed
feat: Add sqlx runtime implementations for Wasip3 trait and minor runtime adjustments
Signed-off-by: Aditya <aditya.salunkh919@gmail.com>
1 parent c4c69bf commit 149da63

File tree

7 files changed

+98
-77
lines changed

7 files changed

+98
-77
lines changed

sqlx-core/src/fs.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub async fn create_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
5050
let path = PathBuf::from(path.as_ref());
5151
#[cfg(not(target_arch = "wasm32"))]
5252
{
53-
rt::spawn_blocking(move || std::fs::create_dir_all(path)).await
53+
rt::spawn_blocking(move || std::fs::create_dir_all(path)).await
5454
}
5555
#[cfg(target_arch = "wasm32")]
5656
{
@@ -103,7 +103,6 @@ pub async fn read_dir(path: PathBuf) -> io::Result<ReadDir> {
103103
})
104104
}
105105

106-
107106
#[cfg(target_arch = "wasm32")]
108107
pub async fn read_dir(path: PathBuf) -> io::Result<ReadDir> {
109108
todo!()
@@ -144,8 +143,6 @@ impl ReadDir {
144143
} else {
145144
Ok(None)
146145
}
147-
148-
149146
}
150147
#[cfg(target_arch = "wasm32")]
151148
todo!()

sqlx-core/src/migrate/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,5 @@ pub use source::{MigrationSource, ResolveConfig, ResolveWith};
1717
#[doc(hidden)]
1818
pub use source::resolve;
1919
#[cfg(not(target_arch = "wasm32"))]
20-
2120
#[doc(hidden)]
2221
pub use source::{resolve_blocking, resolve_blocking_with_config};

sqlx-core/src/net/socket/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ pub async fn connect_tcp<Ws: WithSocket>(
186186
port: u16,
187187
with_socket: Ws,
188188
) -> crate::Result<Ws::Output> {
189-
#[cfg(all(feature = "_rt-tokio", not(target_arch = "wasm32")))]
189+
#[cfg(all(feature = "_rt-tokio", not(target_arch = "wasm32")))]
190190
if crate::rt::rt_tokio::available() {
191191
return Ok(with_socket
192192
.with_socket(tokio::net::TcpStream::connect((host, port)).await?)
@@ -206,7 +206,6 @@ pub async fn connect_tcp<Ws: WithSocket>(
206206
let res = crate::rt::rt_wasip3::connect_tcp(host, port, with_socket).await;
207207
return res;
208208
}
209-
210209
}
211210

212211
/// Open a TCP socket to `host` and `port`.

sqlx-core/src/rt/mod.rs

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ pub enum JoinHandle<T> {
2626
#[cfg(feature = "_rt-tokio")]
2727
Tokio(tokio::task::JoinHandle<T>),
2828

29+
// WASI P3 runtime
30+
#[cfg(target_arch = "wasm32")]
31+
Wasip3(crate::rt::rt_wasip3::JoinHandle<T>),
32+
2933
// Implementation shared by `smol` and `async-global-executor`
3034
#[cfg(feature = "_rt-async-task")]
3135
AsyncTask(Option<async_task::Task<T>>),
@@ -37,30 +41,20 @@ pub enum JoinHandle<T> {
3741
pub async fn timeout<F: Future>(duration: Duration, f: F) -> Result<F::Output, TimeoutError> {
3842
#[cfg(target_arch = "wasm32")]
3943
{
40-
let timeout = crate::rt::rt_wasip3::spawn(wasi::clocks::monotonic_clock::wait_for(
44+
let timeout_future = wasip3::clocks::monotonic_clock::wait_for(
4145
duration.as_nanos().try_into().unwrap_or(u64::MAX),
42-
));
43-
let mut timeout = core::pin::pin!(timeout);
46+
);
47+
let mut timeout = core::pin::pin!(timeout_future);
4448
let mut f = core::pin::pin!(f);
45-
core::future::poll_fn(|cx| {
46-
match timeout.as_mut().poll(cx) {
47-
Poll::Ready(Some(())) => {
48-
Poll::Ready(Err(TimeoutError(())))
49-
}
50-
Poll::Ready(None) => {
51-
Poll::Ready(Err(TimeoutError(())))
52-
}
53-
Poll::Pending => {
54-
f.as_mut().poll(cx).map(Ok)
55-
}
56-
}
49+
50+
return core::future::poll_fn(|cx| match timeout.as_mut().poll(cx) {
51+
Poll::Ready(_) => Poll::Ready(Err(TimeoutError)),
52+
Poll::Pending => f.as_mut().poll(cx).map(Ok),
5753
})
58-
.await
54+
.await;
5955
}
6056

61-
#[cfg(all(feature = "_rt-tokio", not(target_arch = "wasm32")))]
62-
#[cfg(debug_assertions)]
63-
let f = Box::pin(f);
57+
#[cfg(feature = "_rt-tokio")]
6458
if rt_tokio::available() {
6559
return tokio::time::timeout(duration, f)
6660
.await
@@ -80,11 +74,11 @@ pub async fn timeout<F: Future>(duration: Duration, f: F) -> Result<F::Output, T
8074
pub async fn sleep(duration: Duration) {
8175
#[cfg(target_arch = "wasm32")]
8276
{
83-
return crate::rt::rt_wasip3::spawn(wasi::clocks::monotonic_clock::wait_for(
77+
wasip3::clocks::monotonic_clock::wait_for(
8478
duration.as_nanos().try_into().unwrap_or(u64::MAX),
85-
))
86-
.await
87-
.unwrap();
79+
)
80+
.await;
81+
return;
8882
}
8983
#[cfg(feature = "_rt-tokio")]
9084
if rt_tokio::available() {
@@ -107,6 +101,11 @@ where
107101
F: Future + Send + 'static,
108102
F::Output: Send + 'static,
109103
{
104+
#[cfg(target_arch = "wasm32")]
105+
{
106+
return JoinHandle::Wasip3(crate::rt::rt_wasip3::spawn(fut));
107+
}
108+
110109
#[cfg(feature = "_rt-tokio")]
111110
if let Ok(handle) = tokio::runtime::Handle::try_current() {
112111
return JoinHandle::Tokio(handle.spawn(fut));
@@ -124,15 +123,6 @@ where
124123
}
125124
}
126125
}
127-
#[cfg(target_arch = "wasm32")]
128-
#[track_caller]
129-
pub fn spawn<F>(fut: F) -> JoinHandle<F::Output>
130-
where
131-
F: Future + Send + 'static,
132-
F::Output: Send + 'static,
133-
{
134-
JoinHandle::Tokio(tokio::task::spawn(fut))
135-
}
136126

137127
#[cfg(target_arch = "wasm32")]
138128
#[track_caller]
@@ -141,8 +131,9 @@ where
141131
F: FnOnce() -> R + Send + 'static,
142132
R: Send + 'static,
143133
{
144-
// In WASI P3, we use our async spawn_blocking implementation
145-
JoinHandle::Tokio(tokio::task::spawn(crate::rt::rt_wasip3::spawn_blocking(f)))
134+
JoinHandle::Wasip3(crate::rt::rt_wasip3::spawn(
135+
crate::rt::rt_wasip3::spawn_blocking(f),
136+
))
146137
}
147138

148139
#[cfg(not(target_arch = "wasm32"))]
@@ -175,7 +166,7 @@ pub async fn yield_now() {
175166
{
176167
return crate::rt::rt_wasip3::yield_now().await;
177168
}
178-
169+
179170
#[cfg(feature = "_rt-tokio")]
180171
if rt_tokio::available() {
181172
return tokio::task::yield_now().await;
@@ -204,14 +195,15 @@ pub async fn yield_now() {
204195
.await
205196
}
206197

198+
#[cfg(not(target_arch = "wasm32"))]
207199
#[track_caller]
208200
pub fn test_block_on<F: Future>(f: F) -> F::Output {
209201
#[cfg(feature = "_rt-async-io")]
210202
{
211203
return async_io::block_on(f);
212204
}
213205

214-
#[cfg(any(feature = "_rt-tokio", target_arch = "wasm32"))]
206+
#[cfg(feature = "_rt-tokio")]
215207
{
216208
return tokio::runtime::Builder::new_current_thread()
217209
.enable_all()
@@ -223,7 +215,7 @@ pub fn test_block_on<F: Future>(f: F) -> F::Output {
223215
#[cfg(all(
224216
feature = "_rt-async-std",
225217
not(feature = "_rt-async-io"),
226-
not(any(feature = "_rt-tokio", target_arch = "wasm32"))
218+
not(feature = "_rt-tokio")
227219
))]
228220
{
229221
return async_std::task::block_on(f);
@@ -232,14 +224,22 @@ pub fn test_block_on<F: Future>(f: F) -> F::Output {
232224
#[cfg(not(any(
233225
feature = "_rt-async-io",
234226
feature = "_rt-async-std",
235-
feature = "_rt-tokio",
236-
target_arch = "wasm32"
227+
feature = "_rt-tokio"
237228
)))]
238229
{
239230
missing_rt(f)
240231
}
241232
}
242233

234+
#[cfg(target_arch = "wasm32")]
235+
#[track_caller]
236+
pub fn test_block_on<F: Future + 'static>(f: F) -> F::Output
237+
where
238+
F::Output: 'static,
239+
{
240+
wasip3::wit_bindgen::rt::async_support::block_on(f)
241+
}
242+
243243
#[track_caller]
244244
pub const fn missing_rt<T>(_unused: T) -> ! {
245245
if cfg!(feature = "_rt-tokio") {
@@ -264,11 +264,14 @@ impl<T: Send + 'static> Future for JoinHandle<T> {
264264
.expect("BUG: task taken")
265265
.poll(cx),
266266

267-
#[cfg(any(feature = "_rt-tokio",target_arch = "wasm32"))]
267+
#[cfg(feature = "_rt-tokio")]
268268
Self::Tokio(handle) => Pin::new(handle)
269269
.poll(cx)
270270
.map(|res| res.expect("spawned task panicked")),
271271

272+
#[cfg(target_arch = "wasm32")]
273+
Self::Wasip3(handle) => Pin::new(handle).poll(cx),
274+
272275
Self::_Phantom(_) => {
273276
let _ = cx;
274277
unreachable!("runtime should have been checked on spawn")

sqlx-core/src/rt/rt_wasip3/mod.rs

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@ use core::future::Future;
22
use core::pin::Pin;
33
use core::task::{Context, Poll};
44

5+
use crate::net::WithSocket;
56
use bytes::{Buf, BytesMut};
67
use wasip3::sockets::types::{IpAddressFamily, IpSocketAddress, TcpSocket as WasiTcpSocket};
7-
use wasip3::wit_stream;
88
use wasip3::wit_bindgen::StreamResult;
9-
10-
use crate::net::WithSocket;
9+
use wasip3::wit_stream;
1110

1211
mod socket;
1312

@@ -28,14 +27,12 @@ impl<T> Future for JoinHandle<T> {
2827
pub fn spawn<T: 'static + Send>(fut: impl Future<Output = T> + Send + 'static) -> JoinHandle<T> {
2928
JoinHandle {
3029
future: Box::pin(async move {
31-
// Yield to allow other tasks to run cooperatively
32-
wasip3::wit_bindgen::yield_async().await;
30+
wasip3::wit_bindgen::yield_async().await;
3331
fut.await
3432
}),
3533
}
3634
}
3735

38-
// CPU-intensive operations using wit_bindgen's yield_blocking
3936
pub fn spawn_blocking<F, R>(f: F) -> impl Future<Output = R>
4037
where
4138
F: FnOnce() -> R + Send + 'static,
@@ -48,22 +45,34 @@ where
4845
}
4946
}
5047

51-
// Native async yielding
48+
// Cooperative yielding for WASI P3
5249
pub async fn yield_now() {
53-
wasip3::wit_bindgen::yield_async().await
50+
wasip3::wit_bindgen::yield_async().await;
5451
}
5552

5653
// Modern WASI P3 TcpSocket using wit_stream for async I/O
5754
pub struct TcpSocket {
5855
wasi_socket: WasiTcpSocket,
5956
read_buffer: BytesMut,
57+
// Active read operation using WASI's waitable system
58+
read_operation: Option<Pin<Box<dyn std::future::Future<Output = std::io::Result<Vec<u8>>> + Send + Sync>>>,
59+
// Active write operation using WASI's waitable system
60+
// write_operation: Option<Pin<Box<dyn std::future::Future<Output = std::io::Result<usize>> + Send + Sync>>>,
61+
// // Write buffer for pending data
62+
// write_buffer: BytesMut,
63+
// Write readiness state
64+
write_ready: bool,
6065
}
6166

6267
impl TcpSocket {
6368
fn new(wasi_socket: WasiTcpSocket) -> Self {
6469
Self {
6570
wasi_socket,
6671
read_buffer: BytesMut::new(),
72+
read_operation: None,
73+
// write_operation: None,
74+
// write_buffer: BytesMut::new(),
75+
write_ready: true,
6776
}
6877
}
6978

@@ -82,36 +91,40 @@ impl TcpSocket {
8291
(StreamResult::Complete(_), data) => {
8392
let to_copy = std::cmp::min(buf.len(), data.len());
8493
buf[..to_copy].copy_from_slice(&data[..to_copy]);
85-
94+
8695
// Buffer remaining data
8796
if data.len() > to_copy {
8897
self.read_buffer.extend_from_slice(&data[to_copy..]);
8998
}
90-
99+
91100
Ok(to_copy)
92101
}
93102
(StreamResult::Dropped, _) => Ok(0),
94-
(StreamResult::Cancelled, _) => {
95-
Err(std::io::Error::new(std::io::ErrorKind::Interrupted, "Read cancelled"))
96-
}
103+
(StreamResult::Cancelled, _) => Err(std::io::Error::new(
104+
std::io::ErrorKind::Interrupted,
105+
"Read cancelled",
106+
)),
97107
}
98108
}
99109

100110
pub async fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
101111
let (mut tx, rx) = wit_stream::new();
102-
112+
103113
// Start the send operation asynchronously
104114
let send_fut = self.wasi_socket.send(rx);
105-
115+
106116
// Write the data
107117
let remaining = tx.write_all(buf.to_vec()).await;
108118
drop(tx);
109-
119+
110120
// Wait for send to complete
111121
send_fut.await.map_err(|e| {
112-
std::io::Error::new(std::io::ErrorKind::BrokenPipe, format!("Send failed: {:?}", e))
122+
std::io::Error::new(
123+
std::io::ErrorKind::BrokenPipe,
124+
format!("Send failed: {:?}", e),
125+
)
113126
})?;
114-
127+
115128
if remaining.is_empty() {
116129
Ok(buf.len())
117130
} else {
@@ -125,12 +138,22 @@ pub async fn connect_tcp<Ws: WithSocket>(
125138
port: u16,
126139
with_socket: Ws,
127140
) -> crate::Result<Ws::Output> {
128-
let addresses = wasip3::sockets::ip_name_lookup::resolve_addresses(host.to_string()).await
129-
.map_err(|e| crate::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, format!("DNS failed: {:?}", e))))?;
130-
131-
let ip = addresses.into_iter().next()
132-
.ok_or_else(|| crate::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, "No addresses found")))?;
133-
141+
let addresses = wasip3::sockets::ip_name_lookup::resolve_addresses(host.to_string())
142+
.await
143+
.map_err(|e| {
144+
crate::Error::Io(std::io::Error::new(
145+
std::io::ErrorKind::Other,
146+
format!("DNS failed: {:?}", e),
147+
))
148+
})?;
149+
150+
let ip = addresses.into_iter().next().ok_or_else(|| {
151+
crate::Error::Io(std::io::Error::new(
152+
std::io::ErrorKind::Other,
153+
"No addresses found",
154+
))
155+
})?;
156+
134157
let addr = match ip {
135158
wasip3::sockets::types::IpAddress::Ipv4(ipv4) => {
136159
IpSocketAddress::Ipv4(wasip3::sockets::types::Ipv4SocketAddress {
@@ -154,7 +177,7 @@ pub async fn connect_tcp<Ws: WithSocket>(
154177
format!("failed to create socket: {:?}", e),
155178
))
156179
})?;
157-
180+
158181
wasi_socket.connect(addr).await.map_err(|e| {
159182
crate::Error::Io(std::io::Error::new(
160183
std::io::ErrorKind::ConnectionRefused,
@@ -165,4 +188,4 @@ pub async fn connect_tcp<Ws: WithSocket>(
165188
let tcp_socket = TcpSocket::new(wasi_socket);
166189

167190
Ok(with_socket.with_socket(tcp_socket).await)
168-
}
191+
}

0 commit comments

Comments
 (0)