Skip to content

Commit 8774252

Browse files
committed
make things work
1 parent af708bf commit 8774252

File tree

7 files changed

+302
-231
lines changed

7 files changed

+302
-231
lines changed

src/actor.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ impl Actor {
649649
break reply;
650650
}
651651
action => {
652-
if self.on_action(action).is_err() {
652+
if self.on_action(action).await.is_err() {
653653
warn!("failed to send reply: receiver dropped");
654654
}
655655
}
@@ -667,7 +667,7 @@ impl Actor {
667667
}
668668
}
669669

670-
fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
670+
async fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
671671
match action {
672672
Action::Shutdown { .. } => {
673673
unreachable!("Shutdown is handled in run()")
@@ -711,11 +711,11 @@ impl Actor {
711711
send_reply_with(reply, self, |this| this.store.content_hashes())
712712
}
713713
Action::FlushStore { reply } => send_reply(reply, self.store.flush()),
714-
Action::Replica(namespace, action) => self.on_replica_action(namespace, action),
714+
Action::Replica(namespace, action) => self.on_replica_action(namespace, action).await,
715715
}
716716
}
717717

718-
fn on_replica_action(
718+
async fn on_replica_action(
719719
&mut self,
720720
namespace: NamespaceId,
721721
action: ReplicaAction,
@@ -801,13 +801,19 @@ impl Actor {
801801
from,
802802
mut state,
803803
reply,
804-
} => send_reply_with(reply, self, move |this| {
805-
let mut replica = this
806-
.states
807-
.replica_if_syncing(&namespace, &mut this.store)?;
808-
let res = replica.sync_process_message(message, from, &mut state)?;
809-
Ok((res, state))
810-
}),
804+
} => {
805+
let res = async {
806+
let mut replica = self
807+
.states
808+
.replica_if_syncing(&namespace, &mut self.store)?;
809+
let res = replica
810+
.sync_process_message(message, from, &mut state)
811+
.await?;
812+
Ok((res, state))
813+
}
814+
.await;
815+
reply.send(res).map_err(send_reply_error)
816+
}
811817
ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
812818
this.states.ensure_open(&namespace)?;
813819
let peers = this.store.get_sync_peers(&namespace)?;

src/engine.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
//! [`crate::Replica`] is also called documents here.
44
55
use std::{
6-
io,
76
path::PathBuf,
87
str::FromStr,
98
sync::{Arc, RwLock},
@@ -76,13 +75,13 @@ impl Engine {
7675
let me = endpoint.node_id().fmt_short();
7776

7877
let content_status_cb: ContentStatusCallback = {
79-
let bao_store = bao_store.clone();
78+
let blobs = bao_store.blobs().clone();
8079
Arc::new(move |hash: iroh_blobs::Hash| {
81-
let fut = Box::pin(async move {
82-
let blob_status = bao_store.blobs().status(hash).await;
80+
let blobs = blobs.clone();
81+
Box::pin(async move {
82+
let blob_status = blobs.status(hash).await;
8383
entry_to_content_status(blob_status)
84-
});
85-
fut
84+
})
8685
})
8786
};
8887
let sync = SyncHandle::spawn(replica_store, Some(content_status_cb.clone()), me.clone());
@@ -205,25 +204,26 @@ impl Engine {
205204
&self,
206205
namespace: NamespaceId,
207206
) -> Result<impl Stream<Item = Result<LiveEvent>> + Unpin + 'static> {
208-
let content_status_cb = self.content_status_cb.clone();
209-
210207
// Create a future that sends channel senders to the respective actors.
211208
// We clone `self` so that the future does not capture any lifetimes.
212-
let this = self;
209+
let content_status_cb = self.content_status_cb.clone();
213210

214211
// Subscribe to insert events from the replica.
215212
let a = {
216213
let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
217-
this.sync.subscribe(namespace, s).await?;
218-
Box::pin(r).map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
214+
self.sync.subscribe(namespace, s).await?;
215+
Box::pin(r).then(move |ev| {
216+
let content_status_cb = content_status_cb.clone();
217+
Box::pin(async move { LiveEvent::from_replica_event(ev, &content_status_cb).await })
218+
})
219219
};
220220

221221
// Subscribe to events from the [`live::Actor`].
222222
let b = {
223223
let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
224224
let r = Box::pin(r);
225225
let (reply, reply_rx) = oneshot::channel();
226-
this.to_live_actor
226+
self.to_live_actor
227227
.send(ToLiveActor::Subscribe {
228228
namespace,
229229
sender: s,
@@ -259,8 +259,8 @@ impl Engine {
259259
/// Converts an [`BlobStatus`] into a ['ContentStatus'].
260260
fn entry_to_content_status(entry: irpc::Result<BlobStatus>) -> ContentStatus {
261261
match entry {
262-
Ok(BlobStatus::Complete) => ContentStatus::Complete,
263-
Ok(BlobStatus::Partial) => ContentStatus::Incomplete,
262+
Ok(BlobStatus::Complete { .. }) => ContentStatus::Complete,
263+
Ok(BlobStatus::Partial { .. }) => ContentStatus::Incomplete,
264264
Ok(BlobStatus::NotFound) => ContentStatus::Missing,
265265
Err(cause) => {
266266
tracing::warn!("Error while checking entry status: {cause:?}");
@@ -322,7 +322,7 @@ impl From<live::Event> for LiveEvent {
322322
}
323323

324324
impl LiveEvent {
325-
fn from_replica_event(
325+
async fn from_replica_event(
326326
ev: crate::Event,
327327
content_status_cb: &ContentStatusCallback,
328328
) -> Result<Self> {
@@ -331,7 +331,7 @@ impl LiveEvent {
331331
entry: entry.into(),
332332
},
333333
crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
334-
content_status: content_status_cb(entry.content_hash()),
334+
content_status: content_status_cb(entry.content_hash()).await,
335335
entry: entry.into(),
336336
from: PublicKey::from_bytes(&from)?,
337337
},

src/engine/live.rs

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@ use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
1212
use iroh_blobs::{
1313
api::{
1414
blobs::BlobStatus,
15-
downloader::{DownloadRequest, Downloader},
15+
downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
1616
Store,
1717
},
18-
get::Stats,
1918
Hash, HashAndFormat,
2019
};
2120
use iroh_gossip::net::Gossip;
@@ -148,7 +147,7 @@ type SyncConnectRes = (
148147
Result<SyncFinished, ConnectError>,
149148
);
150149
type SyncAcceptRes = Result<SyncFinished, AcceptError>;
151-
type DownloadRes = (NamespaceId, Hash, Result<Stats, DownloadError>);
150+
type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
152151

153152
// Currently peers might double-sync in both directions.
154153
pub struct LiveActor {
@@ -177,6 +176,8 @@ pub struct LiveActor {
177176
missing_hashes: HashSet<Hash>,
178177
/// Content hashes queued in downloader.
179178
queued_hashes: QueuedHashes,
179+
/// Nodes known to have a hash
180+
hash_providers: ProviderNodes,
180181

181182
/// Subscribers to actor events
182183
subscribers: SubscribersMap,
@@ -217,6 +218,7 @@ impl LiveActor {
217218
state: Default::default(),
218219
missing_hashes: Default::default(),
219220
queued_hashes: Default::default(),
221+
hash_providers: Default::default(),
220222
metrics,
221223
}
222224
}
@@ -629,7 +631,7 @@ impl LiveActor {
629631
}
630632
}
631633

632-
async fn broadcast_neighbors(&self, namespace: NamespaceId, op: &Op) {
634+
async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
633635
if !self.state.is_syncing(&namespace) {
634636
return;
635637
}
@@ -651,7 +653,7 @@ impl LiveActor {
651653
&mut self,
652654
namespace: NamespaceId,
653655
hash: Hash,
654-
res: Result<Stats, DownloadError>,
656+
res: Result<(), anyhow::Error>,
655657
) {
656658
let completed_namespaces = self.queued_hashes.remove_hash(&hash);
657659
debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
@@ -758,12 +760,22 @@ impl LiveActor {
758760
self.missing_hashes.remove(&hash);
759761
return;
760762
}
763+
self.hash_providers
764+
.0
765+
.lock()
766+
.expect("poisoned")
767+
.entry(hash)
768+
.or_default()
769+
.insert(node);
761770
if self.queued_hashes.contains_hash(&hash) {
762771
self.queued_hashes.insert(hash, namespace);
763-
self.downloader.nodes_have(hash, vec![node]).await;
764772
} else if !only_if_missing || self.missing_hashes.contains(&hash) {
765-
let req = DownloadRequest::new(HashAndFormat::raw(hash), vec![node]);
766-
let handle = self.downloader.queue(req).await;
773+
let req = DownloadRequest::new(
774+
HashAndFormat::raw(hash),
775+
self.hash_providers.clone(),
776+
SplitStrategy::None,
777+
);
778+
let handle = self.downloader.download_with_opts(req);
767779

768780
self.queued_hashes.insert(hash, namespace);
769781
self.missing_hashes.remove(&hash);
@@ -885,6 +897,24 @@ struct QueuedHashes {
885897
by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
886898
}
887899

900+
#[derive(Debug, Clone, Default)]
901+
struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<NodeId>>>>);
902+
903+
impl ContentDiscovery for ProviderNodes {
904+
fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<NodeId> {
905+
let nodes = self
906+
.0
907+
.lock()
908+
.expect("poisoned")
909+
.get(&hash.hash)
910+
.into_iter()
911+
.flatten()
912+
.cloned()
913+
.collect::<Vec<_>>();
914+
Box::pin(n0_future::stream::iter(nodes))
915+
}
916+
}
917+
888918
impl QueuedHashes {
889919
fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
890920
self.by_hash.entry(hash).or_default().insert(namespace);

0 commit comments

Comments
 (0)