Skip to content

Commit ab52263

Browse files
committed
refactor: build on wasm, do not use send_blocking
1 parent 96e45ae commit ab52263

File tree

15 files changed

+331
-264
lines changed

15 files changed

+331
-264
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "Fran
3737
iroh-gossip = { version = "0.94", features = ["net"], default-features = false }
3838
iroh-metrics = { version = "0.36", default-features = false }
3939
irpc = { git = "https://github.com/n0-computer/irpc", branch = "main", default-features = false }
40-
n0-future = "0.3"
40+
n0-future = { version = "0.3", features = ["serde"] }
4141
num_enum = "0.7"
4242
postcard = { version = "1", default-features = false, features = [
4343
"alloc",
@@ -98,3 +98,6 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)"] }
9898

9999
[build-dependencies]
100100
cfg_aliases = "0.2.1"
101+
102+
[patch.crates-io]
103+
n0-future = { path = "../n0-future" }

src/actor.rs

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::{
44
collections::{hash_map, HashMap},
55
num::NonZeroU64,
66
sync::Arc,
7-
thread::JoinHandle,
87
time::Duration,
98
};
109

@@ -18,6 +17,9 @@ use serde::{Deserialize, Serialize};
1817
use tokio::sync::oneshot;
1918
use tracing::{debug, error, error_span, trace, warn};
2019

20+
#[cfg(wasm_browser)]
21+
use tracing::Instrument;
22+
2123
use crate::{
2224
api::{
2325
protocol::{AuthorListResponse, ListResponse},
@@ -228,6 +230,7 @@ struct OpenReplica {
228230
pub struct SyncHandle {
229231
tx: async_channel::Sender<Action>,
230232
#[cfg(wasm_browser)]
233+
#[allow(unused)]
231234
join_handle: Arc<Option<n0_future::task::JoinHandle<()>>>,
232235
#[cfg(not(wasm_browser))]
233236
join_handle: Arc<Option<std::thread::JoinHandle<()>>>,
@@ -275,21 +278,22 @@ impl SyncHandle {
275278
metrics: metrics.clone(),
276279
};
277280

281+
let span = error_span!("sync", %me);
278282
#[cfg(wasm_browser)]
279-
let join_handle = n0_future::task::spawn(actor.run_async());
283+
let join_handle = n0_future::task::spawn(actor.run_async().instrument(span));
280284

281285
#[cfg(not(wasm_browser))]
282286
let join_handle = std::thread::Builder::new()
283287
.name("sync-actor".to_string())
284288
.spawn(move || {
285-
let span = error_span!("sync", %me);
286289
let _enter = span.enter();
287290

288291
if let Err(err) = actor.run_in_thread() {
289292
error!("Sync actor closed with error: {err:?}");
290293
}
291294
})
292295
.expect("failed to spawn thread");
296+
293297
let join_handle = Arc::new(Some(join_handle));
294298
SyncHandle {
295299
tx: action_tx,
@@ -602,15 +606,22 @@ impl SyncHandle {
602606

603607
impl Drop for SyncHandle {
604608
fn drop(&mut self) {
609+
#[cfg(wasm_browser)]
610+
{
611+
let tx = self.tx.clone();
612+
n0_future::task::spawn(async move {
613+
tx.send(Action::Shutdown { reply: None }).await.ok();
614+
});
615+
}
605616
// this means we're dropping the last reference
617+
#[cfg(not(wasm_browser))]
606618
if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
607619
// this call is the reason tx can not be a tokio mpsc channel.
608620
// we have no control about where drop is called, yet tokio send_blocking panics
609621
// when called from inside a tokio runtime.
610622
self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
611623
let handle = handle.take().expect("this can only run once");
612624

613-
#[cfg(not(wasm_browser))]
614625
if let Err(err) = handle.join() {
615626
warn!(?err, "Failed to join sync actor");
616627
}
@@ -628,10 +639,7 @@ struct Actor {
628639
}
629640

630641
impl Actor {
631-
async fn run_in_task(self) {
632-
self.run_async().await
633-
}
634-
642+
#[cfg(not(wasm_browser))]
635643
fn run_in_thread(self) -> Result<()> {
636644
let rt = tokio::runtime::Builder::new_current_thread()
637645
.enable_time()
@@ -779,37 +787,46 @@ impl Actor {
779787
hash,
780788
len,
781789
reply,
782-
} => send_reply_with(reply, self, move |this| {
783-
let author = get_author(&mut this.store, &author)?;
784-
let mut replica = this.states.replica(namespace, &mut this.store)?;
785-
replica.insert(&key, &author, hash, len)?;
786-
this.metrics.new_entries_local.inc();
787-
this.metrics.new_entries_local_size.inc_by(len);
788-
Ok(())
789-
}),
790+
} => {
791+
send_reply_with_async(reply, self, async move |this| {
792+
let author = get_author(&mut this.store, &author)?;
793+
let mut replica = this.states.replica(namespace, &mut this.store)?;
794+
replica.insert(&key, &author, hash, len).await?;
795+
this.metrics.new_entries_local.inc();
796+
this.metrics.new_entries_local_size.inc_by(len);
797+
Ok(())
798+
})
799+
.await
800+
}
790801
ReplicaAction::DeletePrefix { author, key, reply } => {
791-
send_reply_with(reply, self, |this| {
802+
send_reply_with_async(reply, self, async |this| {
792803
let author = get_author(&mut this.store, &author)?;
793804
let mut replica = this.states.replica(namespace, &mut this.store)?;
794-
let res = replica.delete_prefix(&key, &author)?;
805+
let res = replica.delete_prefix(&key, &author).await?;
795806
Ok(res)
796807
})
808+
.await
797809
}
798810
ReplicaAction::InsertRemote {
799811
entry,
800812
from,
801813
content_status,
802814
reply,
803-
} => send_reply_with(reply, self, move |this| {
804-
let mut replica = this
805-
.states
806-
.replica_if_syncing(&namespace, &mut this.store)?;
807-
let len = entry.content_len();
808-
replica.insert_remote_entry(entry, from, content_status)?;
809-
this.metrics.new_entries_remote.inc();
810-
this.metrics.new_entries_remote_size.inc_by(len);
811-
Ok(())
812-
}),
815+
} => {
816+
send_reply_with_async(reply, self, async move |this| {
817+
let mut replica = this
818+
.states
819+
.replica_if_syncing(&namespace, &mut this.store)?;
820+
let len = entry.content_len();
821+
replica
822+
.insert_remote_entry(entry, from, content_status)
823+
.await?;
824+
this.metrics.new_entries_remote.inc();
825+
this.metrics.new_entries_remote_size.inc_by(len);
826+
Ok(())
827+
})
828+
.await
829+
}
813830

814831
ReplicaAction::SyncInitialMessage { reply } => {
815832
send_reply_with(reply, self, move |this| {
@@ -1064,6 +1081,14 @@ fn send_reply_with<T>(
10641081
sender.send(f(this)).map_err(send_reply_error)
10651082
}
10661083

1084+
async fn send_reply_with_async<T>(
1085+
sender: oneshot::Sender<Result<T>>,
1086+
this: &mut Actor,
1087+
f: impl AsyncFnOnce(&mut Actor) -> Result<T>,
1088+
) -> Result<(), SendReplyError> {
1089+
sender.send(f(this).await).map_err(send_reply_error)
1090+
}
1091+
10671092
fn send_reply_error<T>(_err: T) -> SendReplyError {
10681093
SendReplyError
10691094
}

src/api/actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ impl RpcActor {
429429
return;
430430
}
431431
};
432-
tokio::task::spawn(async move {
432+
n0_future::task::spawn(async move {
433433
loop {
434434
tokio::select! {
435435
msg = stream.next() => {

src/engine.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@
22
//!
33
//! [`crate::Replica`] is also called documents here.
44
5-
use std::{
6-
path::PathBuf,
7-
str::FromStr,
8-
sync::{Arc, RwLock},
9-
};
5+
use std::sync::{Arc, RwLock};
106

11-
use anyhow::{bail, Context, Result};
7+
use anyhow::{bail, Result};
128
use futures_lite::{Stream, StreamExt};
139
use iroh::{Endpoint, EndpointAddr, PublicKey};
1410
use iroh_blobs::{
@@ -17,9 +13,9 @@ use iroh_blobs::{
1713
Hash,
1814
};
1915
use iroh_gossip::net::Gossip;
16+
use n0_future::task::AbortOnDropHandle;
2017
use serde::{Deserialize, Serialize};
2118
use tokio::sync::{mpsc, oneshot};
22-
use tokio_util::task::AbortOnDropHandle;
2319
use tracing::{debug, error, error_span, Instrument};
2420

2521
use self::live::{LiveActor, ToLiveActor};
@@ -128,7 +124,7 @@ impl Engine {
128124
live_actor_tx.clone(),
129125
sync.metrics().clone(),
130126
);
131-
let actor_handle = tokio::task::spawn(
127+
let actor_handle = n0_future::task::spawn(
132128
async move {
133129
if let Err(err) = actor.run().await {
134130
error!("sync actor failed: {err:?}");
@@ -355,7 +351,8 @@ pub enum DefaultAuthorStorage {
355351
/// Memory storage.
356352
Mem,
357353
/// File based persistent storage.
358-
Persistent(PathBuf),
354+
#[cfg(feature = "fs-store")]
355+
Persistent(std::path::PathBuf),
359356
}
360357

361358
impl DefaultAuthorStorage {
@@ -373,7 +370,10 @@ impl DefaultAuthorStorage {
373370
docs_store.import_author(author).await?;
374371
Ok(author_id)
375372
}
373+
#[cfg(feature = "fs-store")]
376374
Self::Persistent(ref path) => {
375+
use anyhow::Context;
376+
use std::str::FromStr;
377377
if path.exists() {
378378
let data = tokio::fs::read_to_string(path).await.with_context(|| {
379379
format!(
@@ -407,12 +407,14 @@ impl DefaultAuthorStorage {
407407
}
408408

409409
/// Save a new default author.
410-
pub async fn persist(&self, author_id: AuthorId) -> anyhow::Result<()> {
410+
pub async fn persist(&self, #[allow(unused)] author_id: AuthorId) -> anyhow::Result<()> {
411411
match self {
412412
Self::Mem => {
413413
// persistence is not possible for the mem storage so this is a noop.
414414
}
415+
#[cfg(feature = "fs-store")]
415416
Self::Persistent(ref path) => {
417+
use anyhow::Context;
416418
tokio::fs::write(path, author_id.to_string())
417419
.await
418420
.with_context(|| {

src/engine/live.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
use std::{
44
collections::{HashMap, HashSet},
55
sync::Arc,
6-
time::SystemTime,
76
};
7+
use n0_future::time::SystemTime;
88

99
use anyhow::{Context, Result};
1010
use futures_lite::FutureExt;

src/engine/state.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::{
22
collections::BTreeMap,
3-
time::{Instant, SystemTime},
3+
time::Instant,
44
};
5+
use n0_future::time::SystemTime;
56

67
use anyhow::Result;
78
use iroh::EndpointId;

src/net/codec.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -322,13 +322,15 @@ mod tests {
322322
let alice_replica_id = alice_replica.id();
323323
alice_replica
324324
.hash_and_insert("hello bob", &author, "from alice")
325+
.await
325326
.unwrap();
326327

327328
let mut bob_store = store::Store::memory();
328329
let mut bob_replica = bob_store.new_replica(namespace.clone()).unwrap();
329330
let bob_replica_id = bob_replica.id();
330331
bob_replica
331332
.hash_and_insert("hello alice", &author, "from bob")
333+
.await
332334
.unwrap();
333335

334336
assert_eq!(
@@ -438,9 +440,9 @@ mod tests {
438440

439441
type Message = (AuthorId, Vec<u8>, Hash);
440442

441-
fn insert_messages(
443+
async fn insert_messages(
442444
mut rng: impl CryptoRng,
443-
replica: &mut crate::sync::Replica,
445+
replica: &mut crate::sync::Replica<'_>,
444446
num_authors: usize,
445447
msgs_per_author: usize,
446448
key_value_fn: impl Fn(&AuthorId, usize) -> (String, String),
@@ -453,7 +455,10 @@ mod tests {
453455
for i in 0..msgs_per_author {
454456
for author in authors.iter() {
455457
let (key, value) = key_value_fn(&author.id(), i);
456-
let hash = replica.hash_and_insert(key.clone(), author, value).unwrap();
458+
let hash = replica
459+
.hash_and_insert(key.clone(), author, value)
460+
.await
461+
.unwrap();
457462
res.push((author.id(), key.as_bytes().to_vec(), hash));
458463
}
459464
}
@@ -509,7 +514,8 @@ mod tests {
509514
format!("from alice by {author}: {i}"),
510515
)
511516
},
512-
);
517+
)
518+
.await;
513519
all_messages.extend_from_slice(&alice_messages);
514520

515521
let mut bob_replica = bob_store.new_replica(namespace.clone()).unwrap();
@@ -524,7 +530,8 @@ mod tests {
524530
format!("from bob by {author}: {i}"),
525531
)
526532
},
527-
);
533+
)
534+
.await;
528535
all_messages.extend_from_slice(&bob_messages);
529536

530537
all_messages.sort();
@@ -646,10 +653,12 @@ mod tests {
646653
// Insert into alice
647654
let hash_alice = alice_replica
648655
.hash_and_insert(&key, &author, &value_alice)
656+
.await
649657
.unwrap();
650658
// Insert into bob
651659
let hash_bob = bob_replica
652660
.hash_and_insert(&key, &author, &value_bob)
661+
.await
653662
.unwrap();
654663

655664
assert_eq!(

0 commit comments

Comments
 (0)