@@ -104,6 +104,7 @@ impl<T: RpcMessage> Sink<T> for SendSink<T> {
104104
105105enum RecvStreamInner < T : RpcMessage > {
106106 Direct ( :: flume:: r#async:: RecvStream < ' static , T > ) ,
107+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
107108 DirectTokio ( tokio_stream:: wrappers:: ReceiverStream < T > ) ,
108109 Boxed ( Pin < Box < dyn Stream < Item = Result < T , anyhow:: Error > > + Send + Sync + ' static > > ) ,
109110}
@@ -129,6 +130,7 @@ impl<T: RpcMessage> RecvStream<T> {
129130 }
130131
131132 /// Create a new receive stream from a direct flume receive stream
133+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
132134 pub ( crate ) fn direct_tokio ( stream : tokio_stream:: wrappers:: ReceiverStream < T > ) -> Self {
133135 Self ( RecvStreamInner :: DirectTokio ( stream) )
134136 }
@@ -144,6 +146,7 @@ impl<T: RpcMessage> Stream for RecvStream<T> {
144146 Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
145147 Poll :: Pending => Poll :: Pending ,
146148 } ,
149+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
147150 RecvStreamInner :: DirectTokio ( stream) => match stream. poll_next_unpin ( cx) {
148151 Poll :: Ready ( Some ( item) ) => Poll :: Ready ( Some ( Ok ( item) ) ) ,
149152 Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
@@ -158,6 +161,7 @@ enum OpenFutureInner<'a, In: RpcMessage, Out: RpcMessage> {
158161 /// A direct future (todo)
159162 Direct ( super :: flume:: OpenBiFuture < In , Out > ) ,
160163 /// A direct future (todo)
164+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
161165 DirectTokio ( BoxFuture < ' a , anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > ) ,
162166 /// A boxed future
163167 Boxed ( BoxFuture < ' a , anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > ) ,
@@ -172,6 +176,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> OpenFuture<'a, In, Out> {
172176 Self ( OpenFutureInner :: Direct ( f) )
173177 }
174178 /// Create a new boxed future
179+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
175180 pub fn direct_tokio (
176181 f : impl Future < Output = anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > + Send + Sync + ' a ,
177182 ) -> Self {
@@ -195,6 +200,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> Future for OpenFuture<'a, In, Out> {
195200 . poll ( cx)
196201 . map_ok ( |( send, recv) | ( SendSink :: direct ( send. 0 ) , RecvStream :: direct ( recv. 0 ) ) )
197202 . map_err ( |e| e. into ( ) ) ,
203+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
198204 OpenFutureInner :: DirectTokio ( f) => f. poll ( cx) ,
199205 OpenFutureInner :: Boxed ( f) => f. poll ( cx) ,
200206 }
@@ -205,6 +211,7 @@ enum AcceptFutureInner<'a, In: RpcMessage, Out: RpcMessage> {
205211 /// A direct future
206212 Direct ( super :: flume:: AcceptBiFuture < In , Out > ) ,
207213 /// A direct future
214+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
208215 DirectTokio ( BoxedFuture < ' a , anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > ) ,
209216 /// A boxed future
210217 Boxed ( BoxedFuture < ' a , anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > ) ,
@@ -220,6 +227,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> AcceptFuture<'a, In, Out> {
220227 }
221228
222229 /// bla
230+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
223231 pub fn direct_tokio (
224232 f : impl Future < Output = anyhow:: Result < ( SendSink < Out > , RecvStream < In > ) > > + Send + Sync + ' a ,
225233 ) -> Self {
@@ -243,6 +251,7 @@ impl<'a, In: RpcMessage, Out: RpcMessage> Future for AcceptFuture<'a, In, Out> {
243251 . poll ( cx)
244252 . map_ok ( |( send, recv) | ( SendSink :: direct ( send. 0 ) , RecvStream :: direct ( recv. 0 ) ) )
245253 . map_err ( |e| e. into ( ) ) ,
254+ #[ cfg( feature = "tokio-mpsc-transport" ) ]
246255 AcceptFutureInner :: DirectTokio ( f) => f. poll ( cx) ,
247256 AcceptFutureInner :: Boxed ( f) => f. poll ( cx) ,
248257 }
0 commit comments