Skip to content

Commit f55ddc5

Browse files
integrate boxed transport
1 parent d357445 commit f55ddc5

File tree

1 file changed

+88
-1
lines changed

1 file changed

+88
-1
lines changed

src/transport/boxed.rs

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88

99
use futures_lite::FutureExt;
1010
use futures_sink::Sink;
11-
#[cfg(feature = "quinn-transport")]
11+
#[cfg(any(feature = "quinn-transport", feature = "tokio-mpsc-transport"))]
1212
use futures_util::TryStreamExt;
1313
use futures_util::{future::BoxFuture, SinkExt, Stream, StreamExt};
1414
use pin_project::pin_project;
@@ -21,6 +21,7 @@ type BoxedFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
2121

2222
enum SendSinkInner<T: RpcMessage> {
2323
Direct(::flume::r#async::SendSink<'static, T>),
24+
DirectTokio(tokio_util::sync::PollSender<T>),
2425
Boxed(Pin<Box<dyn Sink<T, Error = anyhow::Error> + Send + Sync + 'static>>),
2526
}
2627

@@ -42,6 +43,10 @@ impl<T: RpcMessage> SendSink<T> {
4243
pub(crate) fn direct(sink: ::flume::r#async::SendSink<'static, T>) -> Self {
4344
Self(SendSinkInner::Direct(sink))
4445
}
46+
47+
pub(crate) fn direct_tokio(sink: tokio_util::sync::PollSender<T>) -> Self {
48+
Self(SendSinkInner::DirectTokio(sink))
49+
}
4550
}
4651

4752
impl<T: RpcMessage> Sink<T> for SendSink<T> {
@@ -53,13 +58,19 @@ impl<T: RpcMessage> Sink<T> for SendSink<T> {
5358
) -> Poll<Result<(), Self::Error>> {
5459
match self.project().0 {
5560
SendSinkInner::Direct(sink) => sink.poll_ready_unpin(cx).map_err(anyhow::Error::from),
61+
SendSinkInner::DirectTokio(sink) => {
62+
sink.poll_ready_unpin(cx).map_err(anyhow::Error::from)
63+
}
5664
SendSinkInner::Boxed(sink) => sink.poll_ready_unpin(cx).map_err(anyhow::Error::from),
5765
}
5866
}
5967

6068
fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
6169
match self.project().0 {
6270
SendSinkInner::Direct(sink) => sink.start_send_unpin(item).map_err(anyhow::Error::from),
71+
SendSinkInner::DirectTokio(sink) => {
72+
sink.start_send_unpin(item).map_err(anyhow::Error::from)
73+
}
6374
SendSinkInner::Boxed(sink) => sink.start_send_unpin(item).map_err(anyhow::Error::from),
6475
}
6576
}
@@ -70,6 +81,9 @@ impl<T: RpcMessage> Sink<T> for SendSink<T> {
7081
) -> Poll<Result<(), Self::Error>> {
7182
match self.project().0 {
7283
SendSinkInner::Direct(sink) => sink.poll_flush_unpin(cx).map_err(anyhow::Error::from),
84+
SendSinkInner::DirectTokio(sink) => {
85+
sink.poll_flush_unpin(cx).map_err(anyhow::Error::from)
86+
}
7387
SendSinkInner::Boxed(sink) => sink.poll_flush_unpin(cx).map_err(anyhow::Error::from),
7488
}
7589
}
@@ -80,13 +94,17 @@ impl<T: RpcMessage> Sink<T> for SendSink<T> {
8094
) -> Poll<Result<(), Self::Error>> {
8195
match self.project().0 {
8296
SendSinkInner::Direct(sink) => sink.poll_close_unpin(cx).map_err(anyhow::Error::from),
97+
SendSinkInner::DirectTokio(sink) => {
98+
sink.poll_close_unpin(cx).map_err(anyhow::Error::from)
99+
}
83100
SendSinkInner::Boxed(sink) => sink.poll_close_unpin(cx).map_err(anyhow::Error::from),
84101
}
85102
}
86103
}
87104

88105
enum RecvStreamInner<T: RpcMessage> {
89106
Direct(::flume::r#async::RecvStream<'static, T>),
107+
DirectTokio(tokio_stream::wrappers::ReceiverStream<T>),
90108
Boxed(Pin<Box<dyn Stream<Item = Result<T, anyhow::Error>> + Send + Sync + 'static>>),
91109
}
92110

@@ -109,6 +127,11 @@ impl<T: RpcMessage> RecvStream<T> {
109127
pub(crate) fn direct(stream: ::flume::r#async::RecvStream<'static, T>) -> Self {
110128
Self(RecvStreamInner::Direct(stream))
111129
}
130+
131+
/// Create a new receive stream from a direct flume receive stream
132+
pub(crate) fn direct_tokio(stream: tokio_stream::wrappers::ReceiverStream<T>) -> Self {
133+
Self(RecvStreamInner::DirectTokio(stream))
134+
}
112135
}
113136

114137
impl<T: RpcMessage> Stream for RecvStream<T> {
@@ -121,6 +144,11 @@ impl<T: RpcMessage> Stream for RecvStream<T> {
121144
Poll::Ready(None) => Poll::Ready(None),
122145
Poll::Pending => Poll::Pending,
123146
},
147+
RecvStreamInner::DirectTokio(stream) => match stream.poll_next_unpin(cx) {
148+
Poll::Ready(Some(item)) => Poll::Ready(Some(Ok(item))),
149+
Poll::Ready(None) => Poll::Ready(None),
150+
Poll::Pending => Poll::Pending,
151+
},
124152
RecvStreamInner::Boxed(stream) => stream.poll_next_unpin(cx),
125153
}
126154
}
@@ -129,6 +157,8 @@ impl<T: RpcMessage> Stream for RecvStream<T> {
129157
enum OpenFutureInner<'a, In: RpcMessage, Out: RpcMessage> {
130158
/// A direct future (todo)
131159
Direct(super::flume::OpenBiFuture<In, Out>),
160+
/// A direct future (todo)
161+
DirectTokio(BoxFuture<'a, anyhow::Result<(SendSink<Out>, RecvStream<In>)>>),
132162
/// A boxed future
133163
Boxed(BoxFuture<'a, anyhow::Result<(SendSink<Out>, RecvStream<In>)>>),
134164
}
@@ -141,6 +171,12 @@ impl<'a, In: RpcMessage, Out: RpcMessage> OpenFuture<'a, In, Out> {
141171
fn direct(f: super::flume::OpenBiFuture<In, Out>) -> Self {
142172
Self(OpenFutureInner::Direct(f))
143173
}
174+
/// Create a new boxed future
175+
pub fn direct_tokio(
176+
f: impl Future<Output = anyhow::Result<(SendSink<Out>, RecvStream<In>)>> + Send + Sync + 'a,
177+
) -> Self {
178+
Self(OpenFutureInner::DirectTokio(Box::pin(f)))
179+
}
144180

145181
/// Create a new boxed future
146182
pub fn boxed(
@@ -159,6 +195,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> Future for OpenFuture<'a, In, Out> {
159195
.poll(cx)
160196
.map_ok(|(send, recv)| (SendSink::direct(send.0), RecvStream::direct(recv.0)))
161197
.map_err(|e| e.into()),
198+
OpenFutureInner::DirectTokio(f) => f.poll(cx),
162199
OpenFutureInner::Boxed(f) => f.poll(cx),
163200
}
164201
}
@@ -167,6 +204,8 @@ impl<'a, In: RpcMessage, Out: RpcMessage> Future for OpenFuture<'a, In, Out> {
167204
enum AcceptFutureInner<'a, In: RpcMessage, Out: RpcMessage> {
168205
/// A direct future
169206
Direct(super::flume::AcceptBiFuture<In, Out>),
207+
/// A direct future
208+
DirectTokio(BoxedFuture<'a, anyhow::Result<(SendSink<Out>, RecvStream<In>)>>),
170209
/// A boxed future
171210
Boxed(BoxedFuture<'a, anyhow::Result<(SendSink<Out>, RecvStream<In>)>>),
172211
}
@@ -180,6 +219,13 @@ impl<'a, In: RpcMessage, Out: RpcMessage> AcceptFuture<'a, In, Out> {
180219
Self(AcceptFutureInner::Direct(f))
181220
}
182221

222+
/// bla
223+
pub fn direct_tokio(
224+
f: impl Future<Output = anyhow::Result<(SendSink<Out>, RecvStream<In>)>> + Send + Sync + 'a,
225+
) -> Self {
226+
Self(AcceptFutureInner::DirectTokio(Box::pin(f)))
227+
}
228+
183229
/// Create a new boxed future
184230
pub fn boxed(
185231
f: impl Future<Output = anyhow::Result<(SendSink<Out>, RecvStream<In>)>> + Send + Sync + 'a,
@@ -197,6 +243,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> Future for AcceptFuture<'a, In, Out> {
197243
.poll(cx)
198244
.map_ok(|(send, recv)| (SendSink::direct(send.0), RecvStream::direct(recv.0)))
199245
.map_err(|e| e.into()),
246+
AcceptFutureInner::DirectTokio(f) => f.poll(cx),
200247
AcceptFutureInner::Boxed(f) => f.poll(cx),
201248
}
202249
}
@@ -368,6 +415,46 @@ impl<S: Service> BoxableServerEndpoint<S::Req, S::Res> for super::flume::FlumeSe
368415
}
369416
}
370417

418+
#[cfg(feature = "tokio-mpsc-transport")]
419+
impl<S: Service> BoxableConnection<S::Res, S::Req> for super::tokio_mpsc::Connection<S> {
420+
fn clone_box(&self) -> Box<dyn BoxableConnection<S::Res, S::Req>> {
421+
Box::new(self.clone())
422+
}
423+
424+
fn open_bi_boxed(&self) -> OpenFuture<S::Res, S::Req> {
425+
let f = Box::pin(async move {
426+
let (send, recv) = super::Connection::open_bi(self).await?;
427+
// return the boxed streams
428+
anyhow::Ok((
429+
SendSink::direct_tokio(send.0),
430+
RecvStream::direct_tokio(recv.0),
431+
))
432+
});
433+
OpenFuture::direct_tokio(f)
434+
}
435+
}
436+
437+
#[cfg(feature = "tokio-mpsc-transport")]
438+
impl<S: Service> BoxableServerEndpoint<S::Req, S::Res> for super::tokio_mpsc::ServerEndpoint<S> {
439+
fn clone_box(&self) -> Box<dyn BoxableServerEndpoint<S::Req, S::Res>> {
440+
Box::new(self.clone())
441+
}
442+
443+
fn accept_bi_boxed(&self) -> AcceptFuture<S::Req, S::Res> {
444+
let f = async move {
445+
let (send, recv) = super::ServerEndpoint::accept_bi(self).await?;
446+
let send = send.sink_map_err(anyhow::Error::from);
447+
let recv = recv.map_err(anyhow::Error::from);
448+
anyhow::Ok((SendSink::boxed(send), RecvStream::boxed(recv)))
449+
};
450+
AcceptFuture::direct_tokio(f)
451+
}
452+
453+
fn local_addr(&self) -> &[super::LocalAddr] {
454+
super::ServerEndpoint::local_addr(self)
455+
}
456+
}
457+
371458
#[cfg(test)]
372459
mod tests {
373460
use crate::Service;

0 commit comments

Comments
 (0)