Skip to content

Commit 9274fd1

Browse files
committed
feat: RPC for subscriptions
1 parent f8bd54a commit 9274fd1

File tree

6 files changed

+188
-35
lines changed

6 files changed

+188
-35
lines changed

iroh-willow/src/engine/actor.rs

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{sync::Arc, thread::JoinHandle};
22

33
use anyhow::Result;
4-
use futures_lite::stream::Stream;
4+
use futures_lite::{stream::Stream, StreamExt};
55
use iroh_base::key::NodeId;
66
use tokio::{
77
sync::{mpsc, oneshot},
@@ -16,13 +16,16 @@ use crate::{
1616
net::ConnHandle,
1717
proto::{
1818
data_model::{AuthorisedEntry, Path, SubspaceId},
19-
grouping::Range3d,
19+
grouping::{Area, Range3d},
2020
keys::{NamespaceId, NamespaceKind, UserId, UserSecretKey},
2121
meadowcap::{self, AccessMode},
2222
},
2323
session::{intents::Intent, run_session, Error, EventSender, SessionHandle},
2424
store::{
25-
traits::{EntryOrigin, EntryReader, EntryStorage, SecretStorage, Storage},
25+
traits::{
26+
EntryOrigin, EntryReader, EntryStorage, SecretStorage, Storage, StoreEvent,
27+
SubscribeParams,
28+
},
2629
Store,
2730
},
2831
};
@@ -211,6 +214,42 @@ impl ActorHandle {
211214
reply_rx.await?;
212215
Ok(())
213216
}
217+
218+
pub async fn subscribe_area(
219+
&self,
220+
namespace: NamespaceId,
221+
area: Area,
222+
params: SubscribeParams,
223+
sender: mpsc::Sender<StoreEvent>,
224+
) -> Result<()> {
225+
self.send(Input::SubscribeArea {
226+
namespace,
227+
area,
228+
params,
229+
sender,
230+
})
231+
.await?;
232+
Ok(())
233+
}
234+
235+
pub async fn resume_subscription(
236+
&self,
237+
progress_id: u64,
238+
namespace: NamespaceId,
239+
area: Area,
240+
params: SubscribeParams,
241+
sender: mpsc::Sender<StoreEvent>,
242+
) -> Result<()> {
243+
self.send(Input::ResumeSubscription {
244+
progress_id,
245+
namespace,
246+
area,
247+
params,
248+
sender,
249+
})
250+
.await?;
251+
Ok(())
252+
}
214253
}
215254

216255
impl Drop for ActorHandle {
@@ -299,6 +338,19 @@ pub enum Input {
299338
#[debug(skip)]
300339
reply: Option<oneshot::Sender<()>>,
301340
},
341+
SubscribeArea {
342+
namespace: NamespaceId,
343+
area: Area,
344+
params: SubscribeParams,
345+
sender: mpsc::Sender<StoreEvent>,
346+
},
347+
ResumeSubscription {
348+
progress_id: u64,
349+
namespace: NamespaceId,
350+
area: Area,
351+
params: SubscribeParams,
352+
sender: mpsc::Sender<StoreEvent>,
353+
},
302354
}
303355

304356
#[derive(Debug)]
@@ -479,6 +531,44 @@ impl<S: Storage> Actor<S> {
479531
let res = self.store.auth().resolve_interests(interests);
480532
send_reply(reply, res.map_err(anyhow::Error::from))
481533
}
534+
Input::SubscribeArea {
535+
namespace,
536+
area,
537+
params,
538+
sender,
539+
} => {
540+
let store = self.store.clone();
541+
self.tasks.spawn_local(async move {
542+
let mut stream = store.entries().subscribe_area(namespace, area, params);
543+
while let Some(event) = stream.next().await {
544+
if let Err(_) = sender.send(event).await {
545+
break;
546+
}
547+
}
548+
});
549+
Ok(())
550+
}
551+
Input::ResumeSubscription {
552+
progress_id,
553+
namespace,
554+
area,
555+
params,
556+
sender,
557+
} => {
558+
let store = self.store.clone();
559+
self.tasks.spawn_local(async move {
560+
let mut stream =
561+
store
562+
.entries()
563+
.resume_subscription(progress_id, namespace, area, params);
564+
while let Some(event) = stream.next().await {
565+
if let Err(_) = sender.send(event).await {
566+
break;
567+
}
568+
}
569+
});
570+
Ok(())
571+
}
482572
}
483573
}
484574
}

iroh-willow/src/store/memory.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -257,16 +257,12 @@ impl EntryStore {
257257
}
258258
}
259259
for i in to_prune {
260-
let pruned = entries.remove(i).into_parts().0;
261-
self.events.insert(|id| {
260+
let pruned = entries.remove(i);
261+
self.events.insert(move |id| {
262262
StoreEvent::Pruned(
263263
id,
264264
traits::PruneEvent {
265-
pruned: (
266-
pruned.namespace_id().clone(),
267-
pruned.subspace_id().clone(),
268-
pruned.path().clone(),
269-
),
265+
pruned,
270266
by: entry.clone(),
271267
},
272268
)
@@ -404,7 +400,7 @@ impl<T> Default for EventQueue<T> {
404400
}
405401

406402
impl<T: Clone> EventQueue<T> {
407-
fn insert(&mut self, f: impl Fn(u64) -> T) {
403+
fn insert(&mut self, f: impl FnOnce(u64) -> T) {
408404
let progress_id = self.next_progress_id();
409405
let event = f(progress_id);
410406
self.events.push_back(event);

iroh-willow/src/store/traits.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ use std::fmt::Debug;
44

55
use anyhow::Result;
66
use futures_lite::Stream;
7+
use serde::{Deserialize, Serialize};
78

89
use crate::{
910
interest::{CapSelector, CapabilityPack},
1011
proto::{
11-
data_model::{AuthorisedEntry, Entry, NamespaceId, Path, SubspaceId, WriteCapability},
12+
data_model::{
13+
self, AuthorisedEntry, Entry, NamespaceId, Path, SubspaceId, WriteCapability,
14+
},
1215
grouping::{Area, Range3d},
1316
keys::{NamespaceSecretKey, NamespaceSignature, UserId, UserSecretKey, UserSignature},
1417
meadowcap::{self, ReadAuthorisation},
@@ -209,10 +212,14 @@ pub trait CapsStorage: Debug + Clone {
209212

210213
/// An event which took place within a [`EntryStorage`].
211214
/// Each event includes a *progress ID* which can be used to *resume* a subscription at any point in the future.
212-
#[derive(Debug, Clone)]
215+
#[derive(Debug, Clone, Serialize, Deserialize)]
213216
pub enum StoreEvent {
214217
/// A new entry was ingested.
215-
Ingested(u64, AuthorisedEntry, EntryOrigin),
218+
Ingested(
219+
u64,
220+
#[serde(with = "data_model::serde_encoding::authorised_entry")] AuthorisedEntry,
221+
EntryOrigin,
222+
),
216223
// PayloadForgotten(u64, PD),
217224
/// An entry was pruned via prefix pruning.
218225
Pruned(u64, PruneEvent),
@@ -249,9 +256,8 @@ impl StoreEvent {
249256
}
250257
StoreEvent::Pruned(_, PruneEvent { pruned, by: _ }) => {
251258
if !params.ingest_only
252-
&& pruned.0 == namespace_id
253-
&& area.subspace().includes(&pruned.1)
254-
&& area.path().is_prefix_of(&pruned.2)
259+
&& *pruned.entry().namespace_id() == namespace_id
260+
&& area.includes_entry(pruned.entry())
255261
{
256262
true
257263
} else {
@@ -263,16 +269,17 @@ impl StoreEvent {
263269
}
264270

265271
/// Describes an [`AuthorisedEntry`] which was pruned and the [`AuthorisedEntry`] which triggered the pruning.
266-
#[derive(Debug, Clone)]
272+
#[derive(Debug, Clone, Serialize, Deserialize)]
267273
pub struct PruneEvent {
268-
/// The subspace ID and path of the entry which was pruned.
269-
pub pruned: (NamespaceId, SubspaceId, Path),
274+
#[serde(with = "data_model::serde_encoding::authorised_entry")]
275+
pub pruned: AuthorisedEntry,
270276
/// The entry which triggered the pruning.
277+
#[serde(with = "data_model::serde_encoding::authorised_entry")]
271278
pub by: AuthorisedEntry,
272279
}
273280

274281
/// The origin of an entry ingestion event.
275-
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
282+
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
276283
pub enum EntryOrigin {
277284
/// The entry was probably created on this machine.
278285
Local,
@@ -287,7 +294,7 @@ pub enum EntryChannel {
287294
}
288295

289296
/// Describes which entries to ignore during a query.
290-
#[derive(Debug, Default)]
297+
#[derive(Debug, Default, Serialize, Deserialize)]
291298
pub struct SubscribeParams {
292299
/// Omit entries whose payload is the empty string.
293300
pub ignore_empty_payloads: bool,

iroh/src/client/spaces.rs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use iroh_willow::{
3838
intents::{serde_encoding::Event, Completion, IntentUpdate},
3939
SessionInit,
4040
},
41+
store::traits::{StoreEvent, SubscribeParams},
4142
};
4243
use ref_cast::RefCast;
4344
use serde::{Deserialize, Serialize};
@@ -353,14 +354,39 @@ impl Space {
353354
})
354355
}
355356

356-
/// TODO
357-
pub fn subscribe(&self, _area: Area) {
358-
todo!()
357+
/// Subscribe to events concerning entries included by an `Area`.
358+
pub async fn subscribe_area(
359+
&self,
360+
area: Area,
361+
params: SubscribeParams,
362+
) -> Result<impl Stream<Item = Result<StoreEvent>>> {
363+
let req = SubscribeRequest {
364+
namespace: self.namespace_id,
365+
area,
366+
params,
367+
initial_progress_id: None,
368+
};
369+
let stream = self.rpc.try_server_streaming(req).await?;
370+
let stream = stream.map(|item| item.map_err(anyhow::Error::from));
371+
Ok(stream)
359372
}
360373

361-
/// TODO
362-
pub fn subscribe_offset(&self, _area: Area, _offset: u64) {
363-
todo!()
374+
/// Resume a subscription using a progress ID obtained from a previous subscription.
375+
pub async fn resume_subscription(
376+
&self,
377+
progress_id: u64,
378+
area: Area,
379+
params: SubscribeParams,
380+
) -> Result<impl Stream<Item = Result<StoreEvent>>> {
381+
let req = SubscribeRequest {
382+
namespace: self.namespace_id,
383+
area,
384+
params,
385+
initial_progress_id: Some(progress_id),
386+
};
387+
let stream = self.rpc.try_server_streaming(req).await?;
388+
let stream = stream.map(|item| item.map_err(anyhow::Error::from));
389+
Ok(stream)
364390
}
365391
}
366392

iroh/src/node/rpc/spaces.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,32 @@ impl<D: Store> Handler<D> {
155155
.await
156156
}
157157
SyncWithPeerUpdate(_) => Err(RpcServerError::UnexpectedStartMessage),
158+
Subscribe(msg) => {
159+
chan.try_server_streaming(msg, self, |handler, req| async move {
160+
let (tx, rx) = mpsc::channel(1024);
161+
if let Some(progress_id) = req.initial_progress_id {
162+
handler
163+
.spaces()?
164+
.resume_subscription(
165+
progress_id,
166+
req.namespace,
167+
req.area,
168+
req.params,
169+
tx,
170+
)
171+
.await
172+
.map_err(map_err)?;
173+
} else {
174+
handler
175+
.spaces()?
176+
.subscribe_area(req.namespace, req.area, req.params, tx)
177+
.await
178+
.map_err(map_err)?;
179+
}
180+
Ok(ReceiverStream::new(rx).map(Ok))
181+
})
182+
.await
183+
}
158184
}
159185
}
160186
}

iroh/src/rpc_protocol/spaces.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ use iroh_willow::{
99
self, serde_encoding::SerdeAuthorisedEntry, AuthorisedEntry, Entry, NamespaceId, Path,
1010
SubspaceId,
1111
},
12-
grouping::{self, Range3d},
12+
grouping::{self, Area, Range3d},
1313
keys::{NamespaceKind, UserId},
1414
meadowcap::{self, AccessMode, SecretKey},
1515
},
1616
session::{
1717
intents::{serde_encoding::Event, IntentUpdate},
1818
SessionInit,
1919
},
20+
store::traits::{StoreEvent, SubscribeParams},
2021
};
2122
use nested_enum_utils::enum_conversions;
2223
use quic_rpc_derive::rpc_requests;
@@ -47,11 +48,11 @@ pub enum Request {
4748
DelegateCaps(DelegateCapsRequest),
4849
#[rpc(response = RpcResult<ImportCapsResponse>)]
4950
ImportCaps(ImportCapsRequest),
50-
// #[rpc(response = RpcResult<ResolveInterestsResponse>)]
51-
// ResolveInterests(ResolveInterestsRequest),
5251
#[bidi_streaming(update = SyncWithPeerUpdate, response = RpcResult<SyncWithPeerResponse>)]
5352
SyncWithPeer(SyncWithPeerRequest),
5453
SyncWithPeerUpdate(SyncWithPeerUpdate),
54+
#[try_server_streaming(create_error = RpcError, item_error = RpcError, item = StoreEvent)]
55+
Subscribe(SubscribeRequest),
5556
}
5657

5758
#[allow(missing_docs)]
@@ -67,8 +68,8 @@ pub enum Response {
6768
CreateUser(RpcResult<CreateUserResponse>),
6869
DelegateCaps(RpcResult<DelegateCapsResponse>),
6970
ImportCaps(RpcResult<ImportCapsResponse>),
70-
// ResolveInterests(RpcResult<ResolveInterestsResponse>),
7171
SyncWithPeer(RpcResult<SyncWithPeerResponse>),
72+
Subscribe(RpcResult<StoreEvent>),
7273
}
7374

7475
#[derive(Debug, Serialize, Deserialize)]
@@ -139,9 +140,7 @@ pub struct GetEntryRequest {
139140
}
140141

141142
#[derive(Debug, Serialize, Deserialize)]
142-
pub struct GetEntryResponse(
143-
pub Option<SerdeAuthorisedEntry>, // #[serde(with = "data_model::serde_encoding::authorised_entry")] pub AuthorisedEntry,
144-
);
143+
pub struct GetEntryResponse(pub Option<SerdeAuthorisedEntry>);
145144

146145
#[derive(Debug, Serialize, Deserialize)]
147146
pub struct CreateNamespaceRequest {
@@ -192,6 +191,15 @@ pub enum SyncWithPeerResponse {
192191
Event(Event),
193192
}
194193

194+
#[derive(Debug, Serialize, Deserialize)]
195+
pub struct SubscribeRequest {
196+
pub namespace: NamespaceId,
197+
#[serde(with = "grouping::serde_encoding::area")]
198+
pub area: Area,
199+
pub params: SubscribeParams,
200+
pub initial_progress_id: Option<u64>,
201+
}
202+
195203
/// Either a complete [`Entry`] or a [`FullEntryForm`].
196204
#[derive(Debug, Serialize, Deserialize)]
197205
pub enum EntryOrForm {

0 commit comments

Comments
 (0)