Skip to content

Commit b0341a1

Browse files
authored
Map transports not services (#106)
2 parents be04be1 + 16b7c49 commit b0341a1

34 files changed

+1158
-917
lines changed

Cargo.lock

Lines changed: 31 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "quic-rpc"
3-
version = "0.14.0"
3+
version = "0.15.0"
44
edition = "2021"
55
authors = ["Rüdiger Klaehn <rklaehn@protonmail.com>", "n0 team"]
66
keywords = ["api", "protocol", "network", "rpc"]
@@ -49,12 +49,13 @@ tracing-subscriber = "0.3.16"
4949
tempfile = "3.5.0"
5050
proc-macro2 = "1.0.66"
5151
futures-buffered = "0.2.4"
52+
testresult = "0.4.1"
53+
nested_enum_utils = "0.1.0"
5254

5355
[features]
5456
hyper-transport = ["dep:flume", "dep:hyper", "dep:bincode", "dep:bytes", "dep:tokio-serde", "dep:tokio-util"]
5557
quinn-transport = ["dep:flume", "dep:quinn", "dep:bincode", "dep:tokio-serde", "dep:tokio-util"]
5658
flume-transport = ["dep:flume"]
57-
combined-transport = []
5859
macros = []
5960
default = ["flume-transport"]
6061

examples/errors.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ impl Fs {
5555
#[tokio::main]
5656
async fn main() -> anyhow::Result<()> {
5757
let fs = Fs;
58-
let (server, client) = quic_rpc::transport::flume::service_connection::<IoService>(1);
59-
let client = RpcClient::new(client);
58+
let (server, client) = quic_rpc::transport::flume::channel(1);
59+
let client = RpcClient::<IoService, _>::new(client);
6060
let server = RpcServer::new(server);
6161
let handle = tokio::task::spawn(async move {
6262
for _ in 0..1 {

examples/macro.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ create_store_dispatch!(Store, dispatch_store_request);
105105

106106
#[tokio::main]
107107
async fn main() -> anyhow::Result<()> {
108-
let (server, client) = flume::service_connection::<StoreService>(1);
108+
let (server, client) = flume::channel(1);
109109
let server_handle = tokio::task::spawn(async move {
110110
let target = Store;
111111
run_server_loop(StoreService, server, target, dispatch_store_request).await

examples/modularize.rs

Lines changed: 59 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use anyhow::Result;
1111
use futures_lite::StreamExt;
1212
use futures_util::SinkExt;
13-
use quic_rpc::{transport::flume, RpcClient, RpcServer, ServiceConnection, ServiceEndpoint};
13+
use quic_rpc::{client::BoxedConnector, transport::flume, Listener, RpcClient, RpcServer};
1414
use tracing::warn;
1515

1616
use app::AppService;
@@ -19,19 +19,19 @@ use app::AppService;
1919
async fn main() -> Result<()> {
2020
// Spawn an inmemory connection.
2121
// Could use quic equally (all code in this example is generic over the transport)
22-
let (server_conn, client_conn) = flume::service_connection::<AppService>(1);
22+
let (server_conn, client_conn) = flume::channel(1);
2323

2424
// spawn the server
2525
let handler = app::Handler::default();
2626
tokio::task::spawn(run_server(server_conn, handler));
2727

2828
// run a client demo
29-
client_demo(client_conn).await?;
29+
client_demo(BoxedConnector::<AppService>::new(client_conn)).await?;
3030

3131
Ok(())
3232
}
3333

34-
async fn run_server<C: ServiceEndpoint<AppService>>(server_conn: C, handler: app::Handler) {
34+
async fn run_server<C: Listener<AppService>>(server_conn: C, handler: app::Handler) {
3535
let server = RpcServer::<AppService, _>::new(server_conn);
3636
loop {
3737
let Ok(accepting) = server.accept().await else {
@@ -50,8 +50,8 @@ async fn run_server<C: ServiceEndpoint<AppService>>(server_conn: C, handler: app
5050
}
5151
}
5252
}
53-
pub async fn client_demo<C: ServiceConnection<AppService>>(conn: C) -> Result<()> {
54-
let rpc_client = RpcClient::new(conn);
53+
pub async fn client_demo(conn: BoxedConnector<AppService>) -> Result<()> {
54+
let rpc_client = RpcClient::<AppService>::new(conn);
5555
let client = app::Client::new(rpc_client.clone());
5656

5757
// call a method from the top-level app client
@@ -99,15 +99,12 @@ mod app {
9999
//!
100100
//! It could also easily compose services from other crates or internal modules.
101101
102+
use super::iroh;
102103
use anyhow::Result;
103104
use derive_more::{From, TryInto};
104-
use quic_rpc::{
105-
message::RpcMsg, server::RpcChannel, RpcClient, Service, ServiceConnection, ServiceEndpoint,
106-
};
105+
use quic_rpc::{message::RpcMsg, server::RpcChannel, Listener, RpcClient, Service};
107106
use serde::{Deserialize, Serialize};
108107

109-
use super::iroh;
110-
111108
#[derive(Debug, Serialize, Deserialize, From, TryInto)]
112109
pub enum Request {
113110
Iroh(iroh::Request),
@@ -153,13 +150,17 @@ mod app {
153150
}
154151

155152
impl Handler {
156-
pub async fn handle_rpc_request<E: ServiceEndpoint<AppService>>(
153+
pub async fn handle_rpc_request<E: Listener<AppService>>(
157154
self,
158155
req: Request,
159-
chan: RpcChannel<AppService, E, AppService>,
156+
chan: RpcChannel<AppService, E>,
160157
) -> Result<()> {
161158
match req {
162-
Request::Iroh(req) => self.iroh.handle_rpc_request(req, chan.map()).await?,
159+
Request::Iroh(req) => {
160+
self.iroh
161+
.handle_rpc_request(req, chan.map().boxed())
162+
.await?
163+
}
163164
Request::AppVersion(req) => chan.rpc(req, self, Self::on_version).await?,
164165
};
165166
Ok(())
@@ -171,20 +172,16 @@ mod app {
171172
}
172173

173174
#[derive(Debug, Clone)]
174-
pub struct Client<S: Service, C: ServiceConnection<S>> {
175-
pub iroh: iroh::Client<S, C>,
176-
client: RpcClient<AppService, C, S>,
175+
pub struct Client {
176+
pub iroh: iroh::Client,
177+
client: RpcClient<AppService>,
177178
}
178179

179-
impl<S, C> Client<S, C>
180-
where
181-
S: Service,
182-
C: ServiceConnection<S>,
183-
{
184-
pub fn new(client: RpcClient<AppService, C, S>) -> Self {
180+
impl Client {
181+
pub fn new(client: RpcClient<AppService>) -> Self {
185182
Self {
186-
iroh: iroh::Client::new(client.clone().map()),
187-
client,
183+
client: client.clone(),
184+
iroh: iroh::Client::new(client.map().boxed()),
188185
}
189186
}
190187

@@ -202,7 +199,7 @@ mod iroh {
202199
203200
use anyhow::Result;
204201
use derive_more::{From, TryInto};
205-
use quic_rpc::{server::RpcChannel, RpcClient, Service, ServiceConnection, ServiceEndpoint};
202+
use quic_rpc::{server::RpcChannel, RpcClient, Service};
206203
use serde::{Deserialize, Serialize};
207204

208205
use super::{calc, clock};
@@ -233,38 +230,38 @@ mod iroh {
233230
}
234231

235232
impl Handler {
236-
pub async fn handle_rpc_request<S, E>(
233+
pub async fn handle_rpc_request(
237234
self,
238235
req: Request,
239-
chan: RpcChannel<IrohService, E, S>,
240-
) -> Result<()>
241-
where
242-
S: Service,
243-
E: ServiceEndpoint<S>,
244-
{
236+
chan: RpcChannel<IrohService>,
237+
) -> Result<()> {
245238
match req {
246-
Request::Calc(req) => self.calc.handle_rpc_request(req, chan.map()).await?,
247-
Request::Clock(req) => self.clock.handle_rpc_request(req, chan.map()).await?,
239+
Request::Calc(req) => {
240+
self.calc
241+
.handle_rpc_request(req, chan.map().boxed())
242+
.await?
243+
}
244+
Request::Clock(req) => {
245+
self.clock
246+
.handle_rpc_request(req, chan.map().boxed())
247+
.await?
248+
}
248249
}
249250
Ok(())
250251
}
251252
}
252253

253254
#[derive(Debug, Clone)]
254-
pub struct Client<S, C> {
255-
pub calc: calc::Client<S, C>,
256-
pub clock: clock::Client<S, C>,
255+
pub struct Client {
256+
pub calc: calc::Client,
257+
pub clock: clock::Client,
257258
}
258259

259-
impl<S, C> Client<S, C>
260-
where
261-
S: Service,
262-
C: ServiceConnection<S>,
263-
{
264-
pub fn new(client: RpcClient<IrohService, C, S>) -> Self {
260+
impl Client {
261+
pub fn new(client: RpcClient<IrohService>) -> Self {
265262
Self {
266-
calc: calc::Client::new(client.clone().map()),
267-
clock: clock::Client::new(client.clone().map()),
263+
calc: calc::Client::new(client.clone().map().boxed()),
264+
clock: clock::Client::new(client.clone().map().boxed()),
268265
}
269266
}
270267
}
@@ -280,7 +277,7 @@ mod calc {
280277
use quic_rpc::{
281278
message::{ClientStreaming, ClientStreamingMsg, Msg, RpcMsg},
282279
server::RpcChannel,
283-
RpcClient, Service, ServiceConnection, ServiceEndpoint,
280+
RpcClient, Service,
284281
};
285282
use serde::{Deserialize, Serialize};
286283
use std::fmt::Debug;
@@ -337,15 +334,11 @@ mod calc {
337334
pub struct Handler;
338335

339336
impl Handler {
340-
pub async fn handle_rpc_request<S, E>(
337+
pub async fn handle_rpc_request(
341338
self,
342339
req: Request,
343-
chan: RpcChannel<CalcService, E, S>,
344-
) -> Result<()>
345-
where
346-
S: Service,
347-
E: ServiceEndpoint<S>,
348-
{
340+
chan: RpcChannel<CalcService>,
341+
) -> Result<()> {
349342
match req {
350343
Request::Add(req) => chan.rpc(req, self, Self::on_add).await?,
351344
Request::Sum(req) => chan.client_streaming(req, self, Self::on_sum).await?,
@@ -373,16 +366,12 @@ mod calc {
373366
}
374367

375368
#[derive(Debug, Clone)]
376-
pub struct Client<S, C> {
377-
client: RpcClient<CalcService, C, S>,
369+
pub struct Client {
370+
client: RpcClient<CalcService>,
378371
}
379372

380-
impl<S, C> Client<S, C>
381-
where
382-
C: ServiceConnection<S>,
383-
S: Service,
384-
{
385-
pub fn new(client: RpcClient<CalcService, C, S>) -> Self {
373+
impl Client {
374+
pub fn new(client: RpcClient<CalcService>) -> Self {
386375
Self { client }
387376
}
388377
pub async fn add(&self, a: i64, b: i64) -> anyhow::Result<i64> {
@@ -403,7 +392,7 @@ mod clock {
403392
use quic_rpc::{
404393
message::{Msg, ServerStreaming, ServerStreamingMsg},
405394
server::RpcChannel,
406-
RpcClient, Service, ServiceConnection, ServiceEndpoint,
395+
RpcClient, Service,
407396
};
408397
use serde::{Deserialize, Serialize};
409398
use std::{
@@ -475,15 +464,11 @@ mod clock {
475464
h
476465
}
477466

478-
pub async fn handle_rpc_request<S, E>(
467+
pub async fn handle_rpc_request(
479468
self,
480469
req: Request,
481-
chan: RpcChannel<ClockService, E, S>,
482-
) -> Result<()>
483-
where
484-
S: Service,
485-
E: ServiceEndpoint<S>,
486-
{
470+
chan: RpcChannel<ClockService>,
471+
) -> Result<()> {
487472
match req {
488473
Request::Tick(req) => chan.server_streaming(req, self, Self::on_tick).await?,
489474
}
@@ -517,16 +502,12 @@ mod clock {
517502
}
518503

519504
#[derive(Debug, Clone)]
520-
pub struct Client<S, C> {
521-
client: RpcClient<ClockService, C, S>,
505+
pub struct Client {
506+
client: RpcClient<ClockService>,
522507
}
523508

524-
impl<S, C> Client<S, C>
525-
where
526-
C: ServiceConnection<S>,
527-
S: Service,
528-
{
529-
pub fn new(client: RpcClient<ClockService, C, S>) -> Self {
509+
impl Client {
510+
pub fn new(client: RpcClient<ClockService>) -> Self {
530511
Self { client }
531512
}
532513
pub async fn tick(&self) -> Result<BoxStream<Result<usize>>> {

0 commit comments

Comments
 (0)