Skip to content

Commit a8a3272

Browse files
committed
Merge remote-tracking branch 'origin/main' into Frando/clone-sender
2 parents bcc4024 + c35c97f commit a8a3272

File tree

6 files changed

+324
-33
lines changed

6 files changed

+324
-33
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "irpc"
3-
version = "0.2.3"
3+
version = "0.3.0"
44
edition = "2021"
55
authors = ["Rüdiger Klaehn <rklaehn@protonmail.com>", "n0 team"]
66
keywords = ["api", "protocol", "network", "rpc"]
@@ -42,7 +42,7 @@ futures-buffered ={ version = "0.2.9", optional = true }
4242
n0-future = { workspace = true }
4343
futures-util = { workspace = true, optional = true }
4444
# for the derive reexport/feature
45-
irpc-derive = { version = "0.2.3", path = "./irpc-derive", optional = true }
45+
irpc-derive = { version = "0.3.0", path = "./irpc-derive", optional = true }
4646

4747
[target.'cfg(not(all(target_family = "wasm", target_os = "unknown")))'.dependencies]
4848
quinn = { workspace = true, optional = true, features = ["runtime-tokio"] }

irpc-derive/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "irpc-derive"
3-
version = "0.2.3"
3+
version = "0.3.0"
44
edition = "2021"
55
authors = ["Rüdiger Klaehn <rklaehn@protonmail.com>"]
66
keywords = ["api", "protocol", "network", "rpc", "macro"]

irpc-iroh/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "irpc-iroh"
3-
version = "0.2.3"
3+
version = "0.3.0"
44
edition = "2021"
55
authors = ["Rüdiger Klaehn <rklaehn@protonmail.com>", "n0 team"]
66
keywords = ["api", "protocol", "network", "rpc"]
@@ -17,12 +17,12 @@ tracing = { workspace = true }
1717
serde = { workspace = true }
1818
postcard = { workspace = true, features = ["alloc", "use-std"] }
1919
n0-future = { workspace = true }
20-
irpc = { version = "0.2.3", path = ".." }
20+
irpc = { version = "0.3.0", path = ".." }
2121

2222
[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies]
2323
getrandom = { version = "0.3", features = ["wasm_js"] }
2424

2525
[dev-dependencies]
2626
n0-future = { workspace = true }
2727
tracing-subscriber = { workspace = true, features = ["fmt"] }
28-
irpc-derive = { version = "0.2.3", path = "../irpc-derive" }
28+
irpc-derive = { version = "0.3.0", path = "../irpc-derive" }

irpc-iroh/examples/auth.rs

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
//! This example demonstrates a few things:
2+
//! * Using irpc with a cloneable server struct instead of with an actor loop
3+
//! * Manually implementing the connection loop
4+
//! * Authenticating peers
5+
6+
use anyhow::Result;
7+
use iroh::{protocol::Router, Endpoint};
8+
9+
use self::storage::{StorageClient, StorageServer};
10+
11+
#[tokio::main]
12+
async fn main() -> Result<()> {
13+
tracing_subscriber::fmt::init();
14+
println!("Remote use");
15+
remote().await?;
16+
Ok(())
17+
}
18+
19+
async fn remote() -> Result<()> {
20+
let (server_router, server_addr) = {
21+
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
22+
let server = StorageServer::new("secret".to_string());
23+
let router = Router::builder(endpoint.clone())
24+
.accept(StorageServer::ALPN, server.clone())
25+
.spawn();
26+
let addr = endpoint.node_addr().await?;
27+
(router, addr)
28+
};
29+
30+
// correct authentication
31+
let client_endpoint = Endpoint::builder().bind().await?;
32+
let api = StorageClient::connect(client_endpoint, server_addr.clone());
33+
api.auth("secret").await?;
34+
api.set("hello".to_string(), "world".to_string()).await?;
35+
api.set("goodbye".to_string(), "world".to_string()).await?;
36+
let value = api.get("hello".to_string()).await?;
37+
println!("value = {:?}", value);
38+
let mut list = api.list().await?;
39+
while let Some(value) = list.recv().await? {
40+
println!("list value = {:?}", value);
41+
}
42+
43+
// invalid authentication
44+
let client_endpoint = Endpoint::builder().bind().await?;
45+
let api = StorageClient::connect(client_endpoint, server_addr.clone());
46+
assert!(api.auth("bad").await.is_err());
47+
assert!(api.get("hello".to_string()).await.is_err());
48+
49+
// no authentication
50+
let client_endpoint = Endpoint::builder().bind().await?;
51+
let api = StorageClient::connect(client_endpoint, server_addr);
52+
assert!(api.get("hello".to_string()).await.is_err());
53+
54+
drop(server_router);
55+
Ok(())
56+
}
57+
58+
mod storage {
59+
//! Implementation of our storage service.
60+
//!
61+
//! The only `pub` item is [`StorageApi`], everything else is private.
62+
63+
use std::{
64+
collections::BTreeMap,
65+
sync::{Arc, Mutex},
66+
};
67+
68+
use anyhow::Result;
69+
use iroh::{
70+
endpoint::{Connection, RecvStream, SendStream},
71+
protocol::ProtocolHandler,
72+
Endpoint,
73+
};
74+
use irpc::{
75+
channel::{oneshot, spsc},
76+
Client, Service, WithChannels,
77+
};
78+
// Import the macro
79+
use irpc_derive::rpc_requests;
80+
use irpc_iroh::{read_request, IrohRemoteConnection};
81+
use serde::{Deserialize, Serialize};
82+
use tracing::info;
83+
84+
const ALPN: &[u8] = b"storage-api/0";
85+
86+
/// A simple storage service, just to try it out
87+
#[derive(Debug, Clone, Copy)]
88+
struct StorageService;
89+
90+
impl Service for StorageService {}
91+
92+
#[derive(Debug, Serialize, Deserialize)]
93+
struct Auth {
94+
token: String,
95+
}
96+
97+
#[derive(Debug, Serialize, Deserialize)]
98+
struct Get {
99+
key: String,
100+
}
101+
102+
#[derive(Debug, Serialize, Deserialize)]
103+
struct List;
104+
105+
#[derive(Debug, Serialize, Deserialize)]
106+
struct Set {
107+
key: String,
108+
value: String,
109+
}
110+
111+
#[derive(Debug, Serialize, Deserialize)]
112+
struct SetMany;
113+
114+
// Use the macro to generate both the StorageProtocol and StorageMessage enums
115+
// plus implement Channels for each type
116+
#[rpc_requests(StorageService, message = StorageMessage)]
117+
#[derive(Serialize, Deserialize)]
118+
enum StorageProtocol {
119+
#[rpc(tx=oneshot::Sender<Result<(), String>>)]
120+
Auth(Auth),
121+
#[rpc(tx=oneshot::Sender<Option<String>>)]
122+
Get(Get),
123+
#[rpc(tx=oneshot::Sender<()>)]
124+
Set(Set),
125+
#[rpc(tx=oneshot::Sender<u64>, rx=spsc::Receiver<(String, String)>)]
126+
SetMany(SetMany),
127+
#[rpc(tx=spsc::Sender<String>)]
128+
List(List),
129+
}
130+
131+
#[derive(Debug, Clone)]
132+
pub struct StorageServer {
133+
state: Arc<Mutex<BTreeMap<String, String>>>,
134+
auth_token: String,
135+
}
136+
137+
impl ProtocolHandler for StorageServer {
138+
fn accept(&self, conn: Connection) -> n0_future::future::Boxed<Result<()>> {
139+
let this = self.clone();
140+
Box::pin(async move {
141+
let mut authed = false;
142+
while let Some((msg, rx, tx)) = read_request(&conn).await? {
143+
let msg_with_channels = upcast_message(msg, rx, tx);
144+
match msg_with_channels {
145+
StorageMessage::Auth(msg) => {
146+
let WithChannels { inner, tx, .. } = msg;
147+
if authed {
148+
conn.close(1u32.into(), b"invalid message");
149+
break;
150+
} else if inner.token != this.auth_token {
151+
conn.close(1u32.into(), b"permission denied");
152+
break;
153+
} else {
154+
authed = true;
155+
tx.send(Ok(())).await.ok();
156+
}
157+
}
158+
_ => {
159+
if !authed {
160+
conn.close(1u32.into(), b"permission denied");
161+
break;
162+
} else {
163+
this.handle_authenticated(msg_with_channels).await;
164+
}
165+
}
166+
}
167+
}
168+
conn.closed().await;
169+
Ok(())
170+
})
171+
}
172+
}
173+
174+
fn upcast_message(msg: StorageProtocol, rx: RecvStream, tx: SendStream) -> StorageMessage {
175+
match msg {
176+
StorageProtocol::Auth(msg) => WithChannels::from((msg, tx, rx)).into(),
177+
StorageProtocol::Get(msg) => WithChannels::from((msg, tx, rx)).into(),
178+
StorageProtocol::Set(msg) => WithChannels::from((msg, tx, rx)).into(),
179+
StorageProtocol::SetMany(msg) => WithChannels::from((msg, tx, rx)).into(),
180+
StorageProtocol::List(msg) => WithChannels::from((msg, tx, rx)).into(),
181+
}
182+
}
183+
184+
impl StorageServer {
185+
pub const ALPN: &[u8] = ALPN;
186+
187+
pub fn new(auth_token: String) -> Self {
188+
Self {
189+
state: Default::default(),
190+
auth_token,
191+
}
192+
}
193+
194+
async fn handle_authenticated(&self, msg: StorageMessage) {
195+
match msg {
196+
StorageMessage::Auth(_) => unreachable!("handled in ProtocolHandler::accept"),
197+
StorageMessage::Get(get) => {
198+
info!("get {:?}", get);
199+
let WithChannels { tx, inner, .. } = get;
200+
let res = self.state.lock().unwrap().get(&inner.key).cloned();
201+
tx.send(res).await.ok();
202+
}
203+
StorageMessage::Set(set) => {
204+
info!("set {:?}", set);
205+
let WithChannels { tx, inner, .. } = set;
206+
self.state.lock().unwrap().insert(inner.key, inner.value);
207+
tx.send(()).await.ok();
208+
}
209+
StorageMessage::SetMany(list) => {
210+
let WithChannels { tx, mut rx, .. } = list;
211+
let mut i = 0;
212+
while let Ok(Some((key, value))) = rx.recv().await {
213+
let mut state = self.state.lock().unwrap();
214+
state.insert(key, value);
215+
i += 1;
216+
}
217+
tx.send(i).await.ok();
218+
}
219+
StorageMessage::List(list) => {
220+
info!("list {:?}", list);
221+
let WithChannels { mut tx, .. } = list;
222+
let values = {
223+
let state = self.state.lock().unwrap();
224+
// TODO: use async lock to not clone here.
225+
let values: Vec<_> = state
226+
.iter()
227+
.map(|(key, value)| format!("{key}={value}"))
228+
.collect();
229+
values
230+
};
231+
for value in values {
232+
if tx.send(value).await.is_err() {
233+
break;
234+
}
235+
}
236+
}
237+
}
238+
}
239+
}
240+
241+
pub struct StorageClient {
242+
inner: Client<StorageMessage, StorageProtocol, StorageService>,
243+
}
244+
245+
impl StorageClient {
246+
pub const ALPN: &[u8] = ALPN;
247+
248+
pub fn connect(endpoint: Endpoint, addr: impl Into<iroh::NodeAddr>) -> StorageClient {
249+
let conn = IrohRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec());
250+
StorageClient {
251+
inner: Client::boxed(conn),
252+
}
253+
}
254+
255+
pub async fn auth(&self, token: &str) -> Result<(), anyhow::Error> {
256+
self.inner
257+
.rpc(Auth {
258+
token: token.to_string(),
259+
})
260+
.await?
261+
.map_err(|err| anyhow::anyhow!(err))
262+
}
263+
264+
pub async fn get(&self, key: String) -> Result<Option<String>, irpc::Error> {
265+
self.inner.rpc(Get { key }).await
266+
}
267+
268+
pub async fn list(&self) -> Result<spsc::Receiver<String>, irpc::Error> {
269+
self.inner.server_streaming(List, 10).await
270+
}
271+
272+
pub async fn set(&self, key: String, value: String) -> Result<(), irpc::Error> {
273+
let msg = Set { key, value };
274+
self.inner.rpc(msg).await
275+
}
276+
}
277+
}

0 commit comments

Comments
 (0)