Skip to content

Commit d4e99c3

Browse files
committed
fix: fixes to subscriptions and add test
1 parent 9274fd1 commit d4e99c3

File tree

3 files changed

+134
-21
lines changed

3 files changed

+134
-21
lines changed

iroh-willow/src/store/memory.rs

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,12 @@ impl traits::SecretStorage for Rc<RefCell<SecretStore>> {
103103

104104
#[derive(Debug, Default)]
105105
pub struct EntryStore {
106-
entries: HashMap<NamespaceId, Vec<AuthorisedEntry>>,
106+
stores: HashMap<NamespaceId, NamespaceStore>,
107+
}
108+
109+
#[derive(Debug, Default)]
110+
pub struct NamespaceStore {
111+
entries: Vec<AuthorisedEntry>,
107112
events: EventQueue<StoreEvent>,
108113
}
109114

@@ -199,8 +204,9 @@ impl traits::EntryReader for Rc<RefCell<EntryStore>> {
199204
range: &Range3d,
200205
) -> impl Iterator<Item = Result<AuthorisedEntry>> + 'a {
201206
let slf = self.borrow();
202-
slf.entries
207+
slf.stores
203208
.get(&namespace)
209+
.map(|s| &s.entries)
204210
.into_iter()
205211
.flatten()
206212
.filter(|entry| range.includes_entry(entry.entry()))
@@ -216,10 +222,11 @@ impl traits::EntryReader for Rc<RefCell<EntryStore>> {
216222
path: &Path,
217223
) -> Result<Option<AuthorisedEntry>> {
218224
let inner = self.borrow();
219-
let Some(entries) = inner.entries.get(&namespace) else {
225+
let Some(entries) = inner.stores.get(&namespace) else {
220226
return Ok(None);
221227
};
222228
Ok(entries
229+
.entries
223230
.iter()
224231
.find(|e| {
225232
let e = e.entry();
@@ -231,10 +238,11 @@ impl traits::EntryReader for Rc<RefCell<EntryStore>> {
231238

232239
impl EntryStore {
233240
fn ingest_entry(&mut self, entry: &AuthorisedEntry, origin: EntryOrigin) -> Result<bool> {
234-
let entries = self
235-
.entries
241+
let store = self
242+
.stores
236243
.entry(*entry.entry().namespace_id())
237244
.or_default();
245+
let entries = &mut store.entries;
238246
let new = entry.entry();
239247
let mut to_prune = vec![];
240248
for (i, existing) in entries.iter().enumerate() {
@@ -258,7 +266,7 @@ impl EntryStore {
258266
}
259267
for i in to_prune {
260268
let pruned = entries.remove(i);
261-
self.events.insert(move |id| {
269+
store.events.insert(move |id| {
262270
StoreEvent::Pruned(
263271
id,
264272
traits::PruneEvent {
@@ -269,7 +277,8 @@ impl EntryStore {
269277
});
270278
}
271279
entries.push(entry.clone());
272-
self.events
280+
store
281+
.events
273282
.insert(|id| StoreEvent::Ingested(id, entry.clone(), origin));
274283
Ok(true)
275284
}
@@ -284,11 +293,23 @@ impl traits::EntryStorage for Rc<RefCell<EntryStore>> {
284293
}
285294

286295
fn snapshot(&self) -> Result<Self::Snapshot> {
287-
let entries = self.borrow().entries.clone();
288-
Ok(Rc::new(RefCell::new(EntryStore {
289-
entries,
290-
events: EventQueue::default(),
291-
})))
296+
// This is quite ugly. But this is a quick memory impl only.
297+
// But we should really maybe strive to not expose snapshots.
298+
let stores = self
299+
.borrow()
300+
.stores
301+
.iter()
302+
.map(|(key, value)| {
303+
(
304+
*key,
305+
NamespaceStore {
306+
entries: value.entries.clone(),
307+
events: Default::default(),
308+
},
309+
)
310+
})
311+
.collect();
312+
Ok(Rc::new(RefCell::new(EntryStore { stores })))
292313
}
293314

294315
fn ingest_entry(&self, entry: &AuthorisedEntry, origin: EntryOrigin) -> Result<bool> {
@@ -302,11 +323,18 @@ impl traits::EntryStorage for Rc<RefCell<EntryStore>> {
302323
area: Area,
303324
params: SubscribeParams,
304325
) -> impl Stream<Item = traits::StoreEvent> + Unpin + 'static {
326+
let progress_id = self
327+
.borrow_mut()
328+
.stores
329+
.entry(namespace)
330+
.or_default()
331+
.events
332+
.next_progress_id();
305333
EventStream {
306334
area,
307335
params,
308336
namespace,
309-
progress_id: self.borrow().events.next_progress_id(),
337+
progress_id,
310338
store: Rc::downgrade(&self),
311339
}
312340
}
@@ -345,16 +373,18 @@ struct EventStream {
345373
impl Stream for EventStream {
346374
type Item = StoreEvent;
347375
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
348-
let Some(store) = self.store.upgrade() else {
376+
let Some(inner) = self.store.upgrade() else {
349377
return Poll::Ready(None);
350378
};
351-
let mut store = store.borrow_mut();
379+
let mut inner_mut = inner.borrow_mut();
380+
let store = inner_mut.stores.entry(self.namespace).or_default();
352381
let res = ready!(store.events.poll_next(
353382
self.progress_id,
354383
|e| e.matches(self.namespace, &self.area, &self.params),
355384
cx,
356385
));
357-
drop(store);
386+
drop(inner_mut);
387+
drop(inner);
358388
Poll::Ready(match res {
359389
None => None,
360390
Some((next_id, event)) => {

iroh/src/client/spaces.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use iroh_willow::{
3636
},
3737
session::{
3838
intents::{serde_encoding::Event, Completion, IntentUpdate},
39-
SessionInit,
39+
SessionInit, SessionMode,
4040
},
4141
store::traits::{StoreEvent, SubscribeParams},
4242
};
@@ -99,7 +99,11 @@ impl Client {
9999
}
100100

101101
/// Import a ticket and start to synchronize.
102-
pub async fn import_and_sync(&self, ticket: SpaceTicket) -> Result<(Space, SyncHandleSet)> {
102+
pub async fn import_and_sync(
103+
&self,
104+
ticket: SpaceTicket,
105+
mode: SessionMode,
106+
) -> Result<(Space, SyncHandleSet)> {
103107
if ticket.caps.is_empty() {
104108
anyhow::bail!("Invalid ticket: Does not include any capabilities");
105109
}
@@ -111,7 +115,7 @@ impl Client {
111115

112116
self.import_caps(ticket.caps).await?;
113117
let interests = Interests::builder().add_full_cap(CapSelector::any(namespace));
114-
let init = SessionInit::reconcile_once(interests);
118+
let init = SessionInit::new(interests, mode);
115119
let mut intents = SyncHandleSet::default();
116120
for addr in ticket.nodes {
117121
let node_id = addr.node_id;

iroh/tests/spaces.rs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use iroh_willow::{
1010
keys::NamespaceKind,
1111
meadowcap::AccessMode,
1212
},
13-
session::intents::Completion,
13+
session::{intents::Completion, SessionMode},
14+
store::traits::{EntryOrigin, StoreEvent},
1415
};
1516
use tracing::info;
1617

@@ -75,7 +76,10 @@ async fn spaces_smoke() -> Result<()> {
7576
.await?;
7677

7778
println!("ticket {ticket:?}");
78-
let (betty_space, betty_sync_intent) = betty.spaces().import_and_sync(ticket).await?;
79+
let (betty_space, betty_sync_intent) = betty
80+
.spaces()
81+
.import_and_sync(ticket, SessionMode::ReconcileOnce)
82+
.await?;
7983

8084
let mut completion = betty_sync_intent.complete_all().await;
8185
assert_eq!(completion.len(), 1);
@@ -132,3 +136,78 @@ async fn spaces_smoke() -> Result<()> {
132136

133137
Ok(())
134138
}
139+
140+
#[tokio::test]
141+
async fn spaces_subscription() -> Result<()> {
142+
iroh_test::logging::setup_multithreaded();
143+
let (alfie_addr, alfie) = spawn_node().await;
144+
let (betty_addr, betty) = spawn_node().await;
145+
info!("alfie is {}", alfie_addr.node_id.fmt_short());
146+
info!("betty is {}", betty_addr.node_id.fmt_short());
147+
148+
let betty_user = betty.spaces().create_user().await?;
149+
let alfie_user = alfie.spaces().create_user().await?;
150+
let alfie_space = alfie
151+
.spaces()
152+
.create(NamespaceKind::Owned, alfie_user)
153+
.await?;
154+
155+
let _namespace = alfie_space.namespace_id();
156+
157+
let mut alfie_sub = alfie_space
158+
.subscribe_area(Area::new_full(), Default::default())
159+
.await?;
160+
161+
let ticket = alfie_space
162+
.share(betty_user, AccessMode::Write, RestrictArea::None)
163+
.await?;
164+
165+
let (betty_space, betty_sync_intent) = betty
166+
.spaces()
167+
.import_and_sync(ticket, SessionMode::ReconcileOnce)
168+
.await?;
169+
170+
let _sync_task = tokio::task::spawn(async move {
171+
// TODO: We should add a "detach" method to a sync intent!
172+
// (leaves the sync running but stop consuming events)
173+
let _ = betty_sync_intent.complete_all().await;
174+
});
175+
176+
let mut betty_sub = betty_space
177+
.resume_subscription(0, Area::new_full(), Default::default())
178+
.await?;
179+
180+
alfie_space
181+
.insert_bytes(
182+
EntryForm::new(alfie_user, Path::from_bytes(&[b"foo"])?),
183+
"hi",
184+
)
185+
.await?;
186+
187+
betty_space
188+
.insert_bytes(
189+
EntryForm::new(betty_user, Path::from_bytes(&[b"foo"])?),
190+
"hi",
191+
)
192+
.await?;
193+
194+
alfie_space
195+
.insert_bytes(
196+
EntryForm::new(alfie_user, Path::from_bytes(&[b"foo"])?),
197+
"hi!!",
198+
)
199+
.await?;
200+
201+
let ev = alfie_sub.next().await.unwrap();
202+
println!("ALFIE 2");
203+
assert!(matches!(ev, StoreEvent::Ingested(0, _, EntryOrigin::Local)));
204+
205+
let ev = betty_sub.next().await.unwrap();
206+
println!("BETTY 2");
207+
assert!(matches!(
208+
ev,
209+
StoreEvent::Ingested(0, _, EntryOrigin::Remote(_))
210+
));
211+
212+
Ok(())
213+
}

0 commit comments

Comments
 (0)