Skip to content

Commit e42f942

Browse files
committed
Rename open_bi and accept_bi to just open and accept
We only support bidirectional, so no need to disambiguate...
1 parent c2520b8 commit e42f942

File tree

14 files changed

+40
-44
lines changed

14 files changed

+40
-44
lines changed

examples/store.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,15 +239,15 @@ async fn _main_unsugared() -> anyhow::Result<()> {
239239
}
240240
let (server, client) = flume::connection::<Service>(1);
241241
let to_string_service = tokio::spawn(async move {
242-
let (mut send, mut recv) = server.accept_bi().await?;
242+
let (mut send, mut recv) = server.accept().await?;
243243
while let Some(item) = recv.next().await {
244244
let item = item?;
245245
println!("server got: {item:?}");
246246
send.send(item.to_string()).await?;
247247
}
248248
anyhow::Ok(())
249249
});
250-
let (mut send, mut recv) = client.open_bi().await?;
250+
let (mut send, mut recv) = client.open().await?;
251251
let print_result_service = tokio::spawn(async move {
252252
while let Some(item) = recv.next().await {
253253
let item = item?;

src/pattern/bidi_streaming.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ where
9696
M: BidiStreamingMsg<SInner>,
9797
{
9898
let msg = self.map.req_into_outer(msg.into());
99-
let (mut send, recv) = self.source.open_bi().await.map_err(Error::Open)?;
99+
let (mut send, recv) = self.source.open().await.map_err(Error::Open)?;
100100
send.send(msg).await.map_err(Error::<C>::Send)?;
101101
let send = UpdateSink(send, PhantomData, Arc::clone(&self.map));
102102
let map = Arc::clone(&self.map);

src/pattern/client_streaming.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ where
9898
M: ClientStreamingMsg<SInner>,
9999
{
100100
let msg = self.map.req_into_outer(msg.into());
101-
let (mut send, mut recv) = self.source.open_bi().await.map_err(Error::Open)?;
101+
let (mut send, mut recv) = self.source.open().await.map_err(Error::Open)?;
102102
send.send(msg).map_err(Error::Send).await?;
103103
let send = UpdateSink::<S, C, M::Update, SInner>(send, PhantomData, Arc::clone(&self.map));
104104
let map = Arc::clone(&self.map);

src/pattern/rpc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ where
7474
M: RpcMsg<SInner>,
7575
{
7676
let msg = self.map.req_into_outer(msg.into());
77-
let (mut send, mut recv) = self.source.open_bi().await.map_err(Error::Open)?;
77+
let (mut send, mut recv) = self.source.open().await.map_err(Error::Open)?;
7878
send.send(msg).await.map_err(Error::<C>::Send)?;
7979
let res = recv
8080
.next()

src/pattern/server_streaming.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ where
8282
M: ServerStreamingMsg<SInner>,
8383
{
8484
let msg = self.map.req_into_outer(msg.into());
85-
let (mut send, recv) = self.source.open_bi().await.map_err(Error::Open)?;
85+
let (mut send, recv) = self.source.open().await.map_err(Error::Open)?;
8686
send.send(msg).map_err(Error::<C>::Send).await?;
8787
let map = Arc::clone(&self.map);
8888
let recv = recv.map(move |x| match x {

src/pattern/try_server_streaming.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ where
191191
Result<StreamCreated, M::CreateError>: Into<SInner::Res> + TryFrom<SInner::Res>,
192192
{
193193
let msg = self.map.req_into_outer(msg.into());
194-
let (mut send, mut recv) = self.source.open_bi().await.map_err(Error::Open)?;
194+
let (mut send, mut recv) = self.source.open().await.map_err(Error::Open)?;
195195
send.send(msg).map_err(Error::Send).await?;
196196
let map = Arc::clone(&self.map);
197197
let Some(initial) = recv.next().await else {

src/server.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,7 @@ impl<S: Service, C: ServiceEndpoint<S>> RpcServer<S, C> {
151151
/// Accepts a new channel from a client. The result is an [Accepting] object that
152152
/// can be used to read the first request.
153153
pub async fn accept(&self) -> result::Result<Accepting<S, C>, RpcServerError<C>> {
154-
let (send, recv) = self
155-
.source
156-
.accept_bi()
157-
.await
158-
.map_err(RpcServerError::Accept)?;
154+
let (send, recv) = self.source.accept().await.map_err(RpcServerError::Accept)?;
159155
Ok(Accepting { send, recv })
160156
}
161157

src/transport/boxed.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ pub trait BoxableConnection<In: RpcMessage, Out: RpcMessage>:
210210
fn clone_box(&self) -> Box<dyn BoxableConnection<In, Out>>;
211211

212212
/// Open a channel to the remote che
213-
fn open_bi_boxed(&self) -> OpenFuture<In, Out>;
213+
fn open_boxed(&self) -> OpenFuture<In, Out>;
214214
}
215215

216216
/// A boxed connection
@@ -242,8 +242,8 @@ impl<S: Service> ConnectionErrors for Connection<S> {
242242
}
243243

244244
impl<S: Service> super::Connection<S::Res, S::Req> for Connection<S> {
245-
async fn open_bi(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
246-
self.0.open_bi_boxed().await
245+
async fn open(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
246+
self.0.open_boxed().await
247247
}
248248
}
249249

@@ -290,7 +290,7 @@ impl<In: RpcMessage, Out: RpcMessage> ConnectionErrors for ServerEndpoint<In, Ou
290290
}
291291

292292
impl<In: RpcMessage, Out: RpcMessage> super::ServerEndpoint<In, Out> for ServerEndpoint<In, Out> {
293-
fn accept_bi(
293+
fn accept(
294294
&self,
295295
) -> impl Future<Output = Result<(Self::SendSink, Self::RecvStream), Self::OpenError>> + Send
296296
{
@@ -308,9 +308,9 @@ impl<S: Service> BoxableConnection<S::Res, S::Req> for super::quinn::QuinnConnec
308308
Box::new(self.clone())
309309
}
310310

311-
fn open_bi_boxed(&self) -> OpenFuture<S::Res, S::Req> {
311+
fn open_boxed(&self) -> OpenFuture<S::Res, S::Req> {
312312
let f = Box::pin(async move {
313-
let (send, recv) = super::Connection::open_bi(self).await?;
313+
let (send, recv) = super::Connection::open(self).await?;
314314
// map the error types to anyhow
315315
let send = send.sink_map_err(anyhow::Error::from);
316316
let recv = recv.map_err(anyhow::Error::from);
@@ -329,7 +329,7 @@ impl<S: Service> BoxableServerEndpoint<S::Req, S::Res> for super::quinn::QuinnSe
329329

330330
fn accept_bi_boxed(&self) -> AcceptFuture<S::Req, S::Res> {
331331
let f = async move {
332-
let (send, recv) = super::ServerEndpoint::accept_bi(self).await?;
332+
let (send, recv) = super::ServerEndpoint::accept(self).await?;
333333
let send = send.sink_map_err(anyhow::Error::from);
334334
let recv = recv.map_err(anyhow::Error::from);
335335
anyhow::Ok((SendSink::boxed(send), RecvStream::boxed(recv)))
@@ -348,8 +348,8 @@ impl<S: Service> BoxableConnection<S::Res, S::Req> for super::flume::FlumeConnec
348348
Box::new(self.clone())
349349
}
350350

351-
fn open_bi_boxed(&self) -> OpenFuture<S::Res, S::Req> {
352-
OpenFuture::direct(super::Connection::open_bi(self))
351+
fn open_boxed(&self) -> OpenFuture<S::Res, S::Req> {
352+
OpenFuture::direct(super::Connection::open(self))
353353
}
354354
}
355355

@@ -360,7 +360,7 @@ impl<S: Service> BoxableServerEndpoint<S::Req, S::Res> for super::flume::FlumeSe
360360
}
361361

362362
fn accept_bi_boxed(&self) -> AcceptFuture<S::Req, S::Res> {
363-
AcceptFuture::direct(super::ServerEndpoint::accept_bi(self))
363+
AcceptFuture::direct(super::ServerEndpoint::accept(self))
364364
}
365365

366366
fn local_addr(&self) -> &[super::LocalAddr] {
@@ -393,14 +393,14 @@ mod tests {
393393
let client = super::Connection::<FooService>::new(client);
394394
// spawn echo server
395395
tokio::spawn(async move {
396-
while let Ok((mut send, mut recv)) = server.accept_bi().await {
396+
while let Ok((mut send, mut recv)) = server.accept().await {
397397
if let Some(Ok(msg)) = recv.next().await {
398398
send.send(msg).await.ok();
399399
}
400400
}
401401
anyhow::Ok(())
402402
});
403-
if let Ok((mut send, mut recv)) = client.open_bi().await {
403+
if let Ok((mut send, mut recv)) = client.open().await {
404404
send.send(1).await.ok();
405405
let res = recv.next().await;
406406
println!("{:?}", res);

src/transport/combined.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -289,14 +289,14 @@ impl<A: Connection<S::Res, S::Req>, B: Connection<S::Res, S::Req>, S: Service>
289289
impl<A: Connection<S::Res, S::Req>, B: Connection<S::Res, S::Req>, S: Service>
290290
Connection<S::Res, S::Req> for CombinedConnection<A, B, S>
291291
{
292-
async fn open_bi(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
292+
async fn open(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
293293
let this = self.clone();
294294
// try a first, then b
295295
if let Some(a) = this.a {
296-
let (send, recv) = a.open_bi().await.map_err(OpenBiError::A)?;
296+
let (send, recv) = a.open().await.map_err(OpenBiError::A)?;
297297
Ok((SendSink::A(send), RecvStream::A(recv)))
298298
} else if let Some(b) = this.b {
299-
let (send, recv) = b.open_bi().await.map_err(OpenBiError::B)?;
299+
let (send, recv) = b.open().await.map_err(OpenBiError::B)?;
300300
Ok((SendSink::B(send), RecvStream::B(recv)))
301301
} else {
302302
Err(OpenBiError::NoChannel)
@@ -322,18 +322,18 @@ impl<A: ServerEndpoint<S::Req, S::Res>, B: ServerEndpoint<S::Req, S::Res>, S: Se
322322
impl<A: ServerEndpoint<S::Req, S::Res>, B: ServerEndpoint<S::Req, S::Res>, S: Service>
323323
ServerEndpoint<S::Req, S::Res> for CombinedServerEndpoint<A, B, S>
324324
{
325-
async fn accept_bi(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
325+
async fn accept(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
326326
let a_fut = async {
327327
if let Some(a) = &self.a {
328-
let (send, recv) = a.accept_bi().await.map_err(AcceptBiError::A)?;
328+
let (send, recv) = a.accept().await.map_err(AcceptBiError::A)?;
329329
Ok((SendSink::A(send), RecvStream::A(recv)))
330330
} else {
331331
std::future::pending().await
332332
}
333333
};
334334
let b_fut = async {
335335
if let Some(b) = &self.b {
336-
let (send, recv) = b.accept_bi().await.map_err(AcceptBiError::B)?;
336+
let (send, recv) = b.accept().await.map_err(AcceptBiError::B)?;
337337
Ok((SendSink::B(send), RecvStream::B(recv)))
338338
} else {
339339
std::future::pending().await
@@ -377,7 +377,7 @@ mod tests {
377377
flume::FlumeConnection<Service>,
378378
Service,
379379
>::new(None, None);
380-
let res = channel.open_bi().await;
380+
let res = channel.open().await;
381381
assert!(matches!(res, Err(OpenBiError::NoChannel)));
382382
}
383383
}

src/transport/flume.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl<S: Service> ConnectionErrors for FlumeServerEndpoint<S> {
131131

132132
type Socket<In, Out> = (self::SendSink<Out>, self::RecvStream<In>);
133133

134-
/// Future returned by [FlumeConnection::open_bi]
134+
/// Future returned by [FlumeConnection::open]
135135
pub struct OpenBiFuture<In: RpcMessage, Out: RpcMessage> {
136136
inner: flume::r#async::SendFut<'static, Socket<Out, In>>,
137137
res: Option<Socket<In, Out>>,
@@ -202,7 +202,7 @@ impl<S: Service> ConnectionCommon<S::Req, S::Res> for FlumeServerEndpoint<S> {
202202

203203
impl<S: Service> ServerEndpoint<S::Req, S::Res> for FlumeServerEndpoint<S> {
204204
#[allow(refining_impl_trait)]
205-
fn accept_bi(&self) -> AcceptBiFuture<S::Req, S::Res> {
205+
fn accept(&self) -> AcceptBiFuture<S::Req, S::Res> {
206206
AcceptBiFuture {
207207
wrapped: self.stream.clone().into_recv_async(),
208208
_p: PhantomData,
@@ -229,7 +229,7 @@ impl<S: Service> ConnectionCommon<S::Res, S::Req> for FlumeConnection<S> {
229229

230230
impl<S: Service> Connection<S::Res, S::Req> for FlumeConnection<S> {
231231
#[allow(refining_impl_trait)]
232-
fn open_bi(&self) -> OpenBiFuture<S::Res, S::Req> {
232+
fn open(&self) -> OpenBiFuture<S::Res, S::Req> {
233233
let (local_send, remote_recv) = flume::bounded::<S::Req>(128);
234234
let (remote_send, local_recv) = flume::bounded::<S::Res>(128);
235235
let remote_chan = (

0 commit comments

Comments
 (0)