Skip to content

Commit e5ac6dd

Browse files
committed
WIP make provider events a proper irpc protocol and allow configuring notifications/requests for each event type.
1 parent b1880e1 commit e5ac6dd

File tree

4 files changed

+294
-7
lines changed

4 files changed

+294
-7
lines changed

Cargo.lock

Lines changed: 0 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ self_cell = "1.1.0"
4242
genawaiter = { version = "0.99.1", features = ["futures03"] }
4343
iroh-base = "0.91.1"
4444
reflink-copy = "0.1.24"
45-
irpc = { version = "0.7.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false }
45+
irpc = { version = "0.7.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false, path = "../irpc" }
4646
iroh-metrics = { version = "0.35" }
4747

4848
[dev-dependencies]

src/provider.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use iroh::{
2020
};
2121
use irpc::channel::oneshot;
2222
use n0_future::StreamExt;
23-
use serde::de::DeserializeOwned;
23+
use serde::{de::DeserializeOwned, Deserialize, Serialize};
2424
use tokio::{io::AsyncRead, select, sync::mpsc};
2525
use tracing::{debug, debug_span, error, warn, Instrument};
2626

@@ -33,6 +33,7 @@ use crate::{
3333
},
3434
Hash,
3535
};
36+
mod event_proto;
3637

3738
/// Provider progress events, to keep track of what the provider is doing.
3839
///
@@ -129,7 +130,7 @@ pub enum Event {
129130
}
130131

131132
/// Statistics about a successful or failed transfer.
132-
#[derive(Debug)]
133+
#[derive(Debug, Serialize, Deserialize)]
133134
pub struct TransferStats {
134135
/// The number of bytes sent that are part of the payload.
135136
pub payload_bytes_sent: u64,

src/provider/event_proto.rs

Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
use std::fmt::Debug;
2+
3+
use iroh::NodeId;
4+
use irpc::{
5+
channel::{none::NoSender, oneshot},
6+
rpc_requests,
7+
};
8+
use serde::{Deserialize, Serialize};
9+
use snafu::Snafu;
10+
11+
use crate::{protocol::ChunkRangesSeq, provider::TransferStats, Hash};
12+
13+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
14+
#[repr(u8)]
15+
pub enum EventMode {
16+
/// We don't get these kinds of events at all
17+
#[default]
18+
None,
19+
/// We get a notification for these kinds of events
20+
Notify,
21+
/// We can respond to these kinds of events, either by aborting or by
22+
/// e.g. introducing a delay for throttling.
23+
Request,
24+
}
25+
26+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27+
#[repr(u8)]
28+
pub enum EventMode2 {
29+
/// We don't get these kinds of events at all
30+
#[default]
31+
None,
32+
/// We get a notification for these kinds of events
33+
Notify,
34+
}
35+
36+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
37+
pub enum AbortReason {
38+
RateLimited,
39+
Permission,
40+
}
41+
42+
#[derive(Debug, Snafu)]
43+
pub enum ClientError {
44+
RateLimited,
45+
Permission,
46+
#[snafu(transparent)]
47+
Irpc {
48+
source: irpc::Error,
49+
},
50+
}
51+
52+
impl From<AbortReason> for ClientError {
53+
fn from(value: AbortReason) -> Self {
54+
match value {
55+
AbortReason::RateLimited => ClientError::RateLimited,
56+
AbortReason::Permission => ClientError::Permission,
57+
}
58+
}
59+
}
60+
61+
pub type EventResult = Result<(), AbortReason>;
62+
pub type ClientResult = Result<(), ClientError>;
63+
64+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
65+
pub struct EventMask {
66+
connected: EventMode,
67+
get: EventMode,
68+
get_many: EventMode,
69+
push: EventMode,
70+
transfer: EventMode,
71+
transfer_complete: EventMode2,
72+
transfer_aborted: EventMode2,
73+
}
74+
75+
/// Newtype wrapper that wraps an event so that it is a distinct type for the notify variant.
76+
#[derive(Debug, Serialize, Deserialize)]
77+
pub struct Notify<T>(T);
78+
79+
#[derive(Debug, Default)]
80+
pub struct Client {
81+
mask: EventMask,
82+
inner: Option<irpc::Client<ProviderProto>>,
83+
}
84+
85+
/// A new get request was received from the provider.
86+
#[derive(Debug, Serialize, Deserialize)]
87+
pub struct GetRequestReceived {
88+
/// The connection id. Multiple requests can be sent over the same connection.
89+
pub connection_id: u64,
90+
/// The request id. There is a new id for each request.
91+
pub request_id: u64,
92+
/// The root hash of the request.
93+
pub hash: Hash,
94+
/// The exact query ranges of the request.
95+
pub ranges: ChunkRangesSeq,
96+
}
97+
98+
#[derive(Debug, Serialize, Deserialize)]
99+
pub struct GetManyRequestReceived {
100+
/// The connection id. Multiple requests can be sent over the same connection.
101+
pub connection_id: u64,
102+
/// The request id. There is a new id for each request.
103+
pub request_id: u64,
104+
/// The root hash of the request.
105+
pub hashes: Vec<Hash>,
106+
/// The exact query ranges of the request.
107+
pub ranges: ChunkRangesSeq,
108+
}
109+
110+
#[derive(Debug, Serialize, Deserialize)]
111+
pub struct PushRequestReceived {
112+
/// The connection id. Multiple requests can be sent over the same connection.
113+
pub connection_id: u64,
114+
/// The request id. There is a new id for each request.
115+
pub request_id: u64,
116+
/// The root hash of the request.
117+
pub hash: Hash,
118+
/// The exact query ranges of the request.
119+
pub ranges: ChunkRangesSeq,
120+
}
121+
122+
#[derive(Debug, Serialize, Deserialize)]
123+
pub struct TransferProgress {
124+
/// The connection id. Multiple requests can be sent over the same connection.
125+
pub connection_id: u64,
126+
/// The request id. There is a new id for each request.
127+
pub request_id: u64,
128+
/// The index of the blob in the request. 0 for the first blob or for raw blob requests.
129+
pub index: u64,
130+
/// The end offset of the chunk that was sent.
131+
pub end_offset: u64,
132+
}
133+
134+
#[derive(Debug, Serialize, Deserialize)]
135+
pub struct TransferStarted {
136+
/// The connection id. Multiple requests can be sent over the same connection.
137+
pub connection_id: u64,
138+
/// The request id. There is a new id for each request.
139+
pub request_id: u64,
140+
/// The index of the blob in the request. 0 for the first blob or for raw blob requests.
141+
pub index: u64,
142+
/// The hash of the blob. This is the hash of the request for the first blob, the child hash (index-1) for subsequent blobs.
143+
pub hash: Hash,
144+
/// The size of the blob. This is the full size of the blob, not the size we are sending.
145+
pub size: u64,
146+
}
147+
148+
#[derive(Debug, Serialize, Deserialize)]
149+
pub struct TransferCompleted {
150+
/// The connection id. Multiple requests can be sent over the same connection.
151+
pub connection_id: u64,
152+
/// The request id. There is a new id for each request.
153+
pub request_id: u64,
154+
/// Statistics about the transfer.
155+
pub stats: Box<TransferStats>,
156+
}
157+
158+
#[derive(Debug, Serialize, Deserialize)]
159+
pub struct TransferAborted {
160+
/// The connection id. Multiple requests can be sent over the same connection.
161+
pub connection_id: u64,
162+
/// The request id. There is a new id for each request.
163+
pub request_id: u64,
164+
/// Statistics about the part of the transfer that was aborted.
165+
pub stats: Option<Box<TransferStats>>,
166+
}
167+
168+
/// Client for progress notifications.
169+
///
170+
/// For most event types, the client can be configured to either send notifications or requests that
171+
/// can have a response.
172+
impl Client {
173+
/// A client that does not send anything.
174+
pub const NONE: Self = Self {
175+
mask: EventMask {
176+
connected: EventMode::None,
177+
get: EventMode::None,
178+
get_many: EventMode::None,
179+
push: EventMode::None,
180+
transfer: EventMode::None,
181+
transfer_complete: EventMode2::None,
182+
transfer_aborted: EventMode2::None,
183+
},
184+
inner: None,
185+
};
186+
187+
pub async fn client_connected(&self, f: impl Fn() -> ClientConnected) -> ClientResult {
188+
Ok(if let Some(client) = &self.inner {
189+
match self.mask.connected {
190+
EventMode::None => {}
191+
EventMode::Notify => client.notify(Notify(f())).await?,
192+
EventMode::Request => client.rpc(f()).await??,
193+
}
194+
})
195+
}
196+
197+
pub async fn get_request(&self, f: impl Fn() -> GetRequestReceived) -> ClientResult {
198+
Ok(if let Some(client) = &self.inner {
199+
match self.mask.get {
200+
EventMode::None => {}
201+
EventMode::Notify => client.notify(Notify(f())).await?,
202+
EventMode::Request => client.rpc(f()).await??,
203+
}
204+
})
205+
}
206+
207+
pub async fn push_request(&self, f: impl Fn() -> PushRequestReceived) -> ClientResult {
208+
Ok(if let Some(client) = &self.inner {
209+
match self.mask.push {
210+
EventMode::None => {}
211+
EventMode::Notify => client.notify(Notify(f())).await?,
212+
EventMode::Request => client.rpc(f()).await??,
213+
}
214+
})
215+
}
216+
217+
pub async fn send_get_many_request(
218+
&self,
219+
f: impl Fn() -> GetManyRequestReceived,
220+
) -> ClientResult {
221+
Ok(if let Some(client) = &self.inner {
222+
match self.mask.get_many {
223+
EventMode::None => {}
224+
EventMode::Notify => client.notify(Notify(f())).await?,
225+
EventMode::Request => client.rpc(f()).await??,
226+
}
227+
})
228+
}
229+
230+
pub async fn transfer_progress(&self, f: impl Fn() -> TransferProgress) -> ClientResult {
231+
Ok(if let Some(client) = &self.inner {
232+
match self.mask.transfer {
233+
EventMode::None => {}
234+
EventMode::Notify => client.notify(Notify(f())).await?,
235+
EventMode::Request => client.rpc(f()).await??,
236+
}
237+
})
238+
}
239+
}
240+
241+
#[rpc_requests(message = ProviderMessage)]
242+
#[derive(Debug, Serialize, Deserialize)]
243+
pub enum ProviderProto {
244+
/// A new client connected to the provider.
245+
#[rpc(tx = oneshot::Sender<EventResult>)]
246+
#[wrap(ClientConnected)]
247+
ClientConnected { connection_id: u64, node_id: NodeId },
248+
/// A new client connected to the provider. Notify variant.
249+
#[rpc(tx = NoSender)]
250+
ClientConnectedNotify(Notify<ClientConnected>),
251+
/// A client disconnected from the provider.
252+
#[rpc(tx = NoSender)]
253+
#[wrap(ConnectionClosed)]
254+
ConnectionClosed { connection_id: u64 },
255+
256+
#[rpc(tx = oneshot::Sender<EventResult>)]
257+
/// A new get request was received from the provider.
258+
GetRequestReceived(GetRequestReceived),
259+
260+
#[rpc(tx = NoSender)]
261+
/// A new get request was received from the provider.
262+
GetRequestReceivedNotify(Notify<GetRequestReceived>),
263+
/// A new get request was received from the provider.
264+
#[rpc(tx = oneshot::Sender<EventResult>)]
265+
GetManyRequestReceived(GetManyRequestReceived),
266+
/// A new get request was received from the provider.
267+
#[rpc(tx = NoSender)]
268+
GetManyRequestReceivedNotify(Notify<GetManyRequestReceived>),
269+
/// A new get request was received from the provider.
270+
#[rpc(tx = oneshot::Sender<EventResult>)]
271+
PushRequestReceived(PushRequestReceived),
272+
/// A new get request was received from the provider.
273+
#[rpc(tx = NoSender)]
274+
PushRequestReceivedNotify(Notify<PushRequestReceived>),
275+
/// Transfer for the nth blob started.
276+
#[rpc(tx = NoSender)]
277+
TransferStarted(TransferStarted),
278+
/// Progress of the transfer.
279+
#[rpc(tx = oneshot::Sender<EventResult>)]
280+
TransferProgress(TransferProgress),
281+
/// Progress of the transfer.
282+
#[rpc(tx = NoSender)]
283+
TransferProgressNotify(Notify<TransferProgress>),
284+
/// Entire transfer completed.
285+
#[rpc(tx = NoSender)]
286+
TransferCompleted(TransferCompleted),
287+
/// Entire transfer aborted.
288+
#[rpc(tx = NoSender)]
289+
TransferAborted(TransferAborted),
290+
}

0 commit comments

Comments
 (0)