@@ -11,7 +11,7 @@ use irpc::{
1111 rpc:: { listen, Handler } ,
1212 rpc_requests,
1313 util:: { make_client_endpoint, make_server_endpoint} ,
14- Client , LocalSender , Request , Service , WithChannels ,
14+ Client , LocalSender , Request , RequestSender , Service ,
1515} ;
1616use n0_future:: {
1717 stream:: StreamExt ,
@@ -59,13 +59,13 @@ enum ComputeRequest {
5959#[ rpc_requests( ComputeService , message = ComputeMessage ) ]
6060#[ derive( Serialize , Deserialize ) ]
6161enum ComputeProtocol {
62- #[ rpc( tx =oneshot:: Sender <u128 >) ]
62+ #[ rpc( reply =oneshot:: Sender <u128 >) ]
6363 Sqr ( Sqr ) ,
64- #[ rpc( rx =mpsc:: Receiver <i64 >, tx =oneshot:: Sender <i64 >) ]
64+ #[ rpc( updates =mpsc:: Receiver <i64 >, reply =oneshot:: Sender <i64 >) ]
6565 Sum ( Sum ) ,
66- #[ rpc( tx =mpsc:: Sender <u64 >) ]
66+ #[ rpc( reply =mpsc:: Sender <u64 >) ]
6767 Fibonacci ( Fibonacci ) ,
68- #[ rpc( rx =mpsc:: Receiver <u64 >, tx =mpsc:: Sender <u64 >) ]
68+ #[ rpc( updates =mpsc:: Receiver <u64 >, reply =mpsc:: Sender <u64 >) ]
6969 Multiply ( Multiply ) ,
7070}
7171
@@ -76,10 +76,10 @@ struct ComputeActor {
7676
7777impl ComputeActor {
7878 pub fn local ( ) -> ComputeApi {
79- let ( tx , rx ) = tokio:: sync:: mpsc:: channel ( 128 ) ;
80- let actor = Self { recv : rx } ;
79+ let ( reply , request ) = tokio:: sync:: mpsc:: channel ( 128 ) ;
80+ let actor = Self { recv : request } ;
8181 n0_future:: task:: spawn ( actor. run ( ) ) ;
82- let local = LocalSender :: < ComputeMessage , ComputeService > :: from ( tx ) ;
82+ let local = LocalSender :: < ComputeMessage , ComputeService > :: from ( reply ) ;
8383 ComputeApi {
8484 inner : local. into ( ) ,
8585 }
@@ -99,34 +99,45 @@ impl ComputeActor {
9999 match msg {
100100 ComputeMessage :: Sqr ( sqr) => {
101101 trace ! ( "sqr {:?}" , sqr) ;
102- let WithChannels {
103- tx, inner, span, ..
102+ let Request {
103+ reply,
104+ message,
105+ span,
106+ ..
104107 } = sqr;
105108 let _entered = span. enter ( ) ;
106- let result = ( inner . num as u128 ) * ( inner . num as u128 ) ;
107- tx . send ( result) . await ?;
109+ let result = ( message . num as u128 ) * ( message . num as u128 ) ;
110+ reply . send ( result) . await ?;
108111 }
109112 ComputeMessage :: Sum ( sum) => {
110113 trace ! ( "sum {:?}" , sum) ;
111- let WithChannels { rx, tx, span, .. } = sum;
114+ let Request {
115+ updates,
116+ reply,
117+ span,
118+ ..
119+ } = sum;
112120 let _entered = span. enter ( ) ;
113- let mut receiver = rx ;
121+ let mut receiver = updates ;
114122 let mut total = 0 ;
115123 while let Some ( num) = receiver. recv ( ) . await ? {
116124 total += num;
117125 }
118- tx . send ( total) . await ?;
126+ reply . send ( total) . await ?;
119127 }
120128 ComputeMessage :: Fibonacci ( fib) => {
121129 trace ! ( "fibonacci {:?}" , fib) ;
122- let WithChannels {
123- tx, inner, span, ..
130+ let Request {
131+ reply,
132+ message,
133+ span,
134+ ..
124135 } = fib;
125136 let _entered = span. enter ( ) ;
126- let sender = tx ;
137+ let sender = reply ;
127138 let mut a = 0u64 ;
128139 let mut b = 1u64 ;
129- while a <= inner . max {
140+ while a <= message . max {
130141 sender. send ( a) . await ?;
131142 let next = a + b;
132143 a = b;
@@ -135,17 +146,17 @@ impl ComputeActor {
135146 }
136147 ComputeMessage :: Multiply ( mult) => {
137148 trace ! ( "multiply {:?}" , mult) ;
138- let WithChannels {
139- rx ,
140- tx ,
141- inner ,
149+ let Request {
150+ updates ,
151+ reply ,
152+ message ,
142153 span,
143154 ..
144155 } = mult;
145156 let _entered = span. enter ( ) ;
146- let mut receiver = rx ;
147- let sender = tx ;
148- let multiplier = inner . initial ;
157+ let mut receiver = updates ;
158+ let sender = reply ;
159+ let multiplier = message . initial ;
149160 while let Some ( num) = receiver. recv ( ) . await ? {
150161 sender. send ( multiplier * num) . await ?;
151162 }
@@ -171,13 +182,13 @@ impl ComputeApi {
171182 let Some ( local) = self . inner . local ( ) else {
172183 bail ! ( "cannot listen on a remote service" ) ;
173184 } ;
174- let handler: Handler < ComputeProtocol > = Arc :: new ( move |msg, rx , tx | {
185+ let handler: Handler < ComputeProtocol > = Arc :: new ( move |msg, request , reply | {
175186 let local = local. clone ( ) ;
176187 Box :: pin ( match msg {
177- ComputeProtocol :: Sqr ( msg) => local. send ( ( msg, tx ) ) ,
178- ComputeProtocol :: Sum ( msg) => local. send ( ( msg, tx , rx ) ) ,
179- ComputeProtocol :: Fibonacci ( msg) => local. send ( ( msg, tx ) ) ,
180- ComputeProtocol :: Multiply ( msg) => local. send ( ( msg, tx , rx ) ) ,
188+ ComputeProtocol :: Sqr ( msg) => local. send ( ( msg, reply ) ) ,
189+ ComputeProtocol :: Sum ( msg) => local. send ( ( msg, reply , request ) ) ,
190+ ComputeProtocol :: Fibonacci ( msg) => local. send ( ( msg, reply ) ) ,
191+ ComputeProtocol :: Multiply ( msg) => local. send ( ( msg, reply , request ) ) ,
181192 } )
182193 } ) ;
183194 Ok ( AbortOnDropHandle :: new ( task:: spawn ( listen (
@@ -188,45 +199,45 @@ impl ComputeApi {
188199 pub async fn sqr ( & self , num : u64 ) -> anyhow:: Result < oneshot:: Receiver < u128 > > {
189200 let msg = Sqr { num } ;
190201 match self . inner . request ( ) . await ? {
191- Request :: Local ( request ) => {
192- let ( tx , rx ) = oneshot:: channel ( ) ;
193- request . send ( ( msg, tx ) ) . await ?;
194- Ok ( rx )
202+ RequestSender :: Local ( sender ) => {
203+ let ( reply , request ) = oneshot:: channel ( ) ;
204+ sender . send ( ( msg, reply ) ) . await ?;
205+ Ok ( request )
195206 }
196- Request :: Remote ( request ) => {
197- let ( _tx , rx ) = request . write ( msg) . await ?;
198- Ok ( rx . into ( ) )
207+ RequestSender :: Remote ( sender ) => {
208+ let ( _reply , request ) = sender . write ( msg) . await ?;
209+ Ok ( request . into ( ) )
199210 }
200211 }
201212 }
202213
203214 pub async fn sum ( & self ) -> anyhow:: Result < ( mpsc:: Sender < i64 > , oneshot:: Receiver < i64 > ) > {
204215 let msg = Sum ;
205216 match self . inner . request ( ) . await ? {
206- Request :: Local ( request ) => {
207- let ( num_tx , num_rx ) = mpsc:: channel ( 10 ) ;
208- let ( sum_tx , sum_rx ) = oneshot:: channel ( ) ;
209- request . send ( ( msg, sum_tx , num_rx ) ) . await ?;
210- Ok ( ( num_tx , sum_rx ) )
217+ RequestSender :: Local ( sender ) => {
218+ let ( num_reply , num_request ) = mpsc:: channel ( 10 ) ;
219+ let ( sum_reply , sum_request ) = oneshot:: channel ( ) ;
220+ sender . send ( ( msg, sum_reply , num_request ) ) . await ?;
221+ Ok ( ( num_reply , sum_request ) )
211222 }
212- Request :: Remote ( request ) => {
213- let ( tx , rx ) = request . write ( msg) . await ?;
214- Ok ( ( tx . into ( ) , rx . into ( ) ) )
223+ RequestSender :: Remote ( sender ) => {
224+ let ( reply , request ) = sender . write ( msg) . await ?;
225+ Ok ( ( reply . into ( ) , request . into ( ) ) )
215226 }
216227 }
217228 }
218229
219230 pub async fn fibonacci ( & self , max : u64 ) -> anyhow:: Result < mpsc:: Receiver < u64 > > {
220231 let msg = Fibonacci { max } ;
221232 match self . inner . request ( ) . await ? {
222- Request :: Local ( request ) => {
223- let ( tx , rx ) = mpsc:: channel ( 128 ) ;
224- request . send ( ( msg, tx ) ) . await ?;
225- Ok ( rx )
233+ RequestSender :: Local ( sender ) => {
234+ let ( reply , request ) = mpsc:: channel ( 128 ) ;
235+ sender . send ( ( msg, reply ) ) . await ?;
236+ Ok ( request )
226237 }
227- Request :: Remote ( request ) => {
228- let ( _tx , rx ) = request . write ( msg) . await ?;
229- Ok ( rx . into ( ) )
238+ RequestSender :: Remote ( sender ) => {
239+ let ( _reply , request ) = sender . write ( msg) . await ?;
240+ Ok ( request . into ( ) )
230241 }
231242 }
232243 }
@@ -237,15 +248,15 @@ impl ComputeApi {
237248 ) -> anyhow:: Result < ( mpsc:: Sender < u64 > , mpsc:: Receiver < u64 > ) > {
238249 let msg = Multiply { initial } ;
239250 match self . inner . request ( ) . await ? {
240- Request :: Local ( request ) => {
241- let ( in_tx , in_rx ) = mpsc:: channel ( 128 ) ;
242- let ( out_tx , out_rx ) = mpsc:: channel ( 128 ) ;
243- request . send ( ( msg, out_tx , in_rx ) ) . await ?;
244- Ok ( ( in_tx , out_rx ) )
251+ RequestSender :: Local ( sender ) => {
252+ let ( in_reply , in_request ) = mpsc:: channel ( 128 ) ;
253+ let ( out_reply , out_request ) = mpsc:: channel ( 128 ) ;
254+ sender . send ( ( msg, out_reply , in_request ) ) . await ?;
255+ Ok ( ( in_reply , out_request ) )
245256 }
246- Request :: Remote ( request ) => {
247- let ( tx , rx ) = request . write ( msg) . await ?;
248- Ok ( ( tx . into ( ) , rx . into ( ) ) )
257+ RequestSender :: Remote ( sender ) => {
258+ let ( reply , request ) = sender . write ( msg) . await ?;
259+ Ok ( ( reply . into ( ) , request . into ( ) ) )
249260 }
250261 }
251262 }
0 commit comments