Skip to content

Commit 55827e8

Browse files
committed
migrate more duplex socket features.
1 parent 9e1f32d commit 55827e8

File tree

9 files changed

+314
-174
lines changed

9 files changed

+314
-174
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ hex = "0.4.0"
3535
name = "echo"
3636
path = "examples/echo/main.rs"
3737

38+
[[example]]
39+
name = "echo_client"
40+
path = "examples/echo_client/main.rs"
41+
3842
# [[example]]
3943
# name = "proxy"
4044
# path = "examples/proxy/main.rs"

examples/echo/main.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,17 @@ extern crate rsocket_rust;
22
extern crate tokio;
33
#[macro_use]
44
extern crate log;
5-
use futures::{SinkExt, StreamExt};
6-
use rsocket_rust::frame::Frame;
75
use rsocket_rust::prelude::*;
8-
use rsocket_rust::transport::DuplexSocket;
9-
use rsocket_rust::transport::{self, Rx, Tx};
106
use std::env;
117
use std::error::Error;
12-
use std::sync::Arc;
13-
use tokio::net::TcpListener;
14-
use tokio::prelude::*;
15-
use tokio::sync::mpsc;
168

179
#[tokio::main]
1810
async fn main() -> Result<(), Box<dyn Error>> {
1911
env_logger::builder().init();
12+
let addr = env::args().nth(1).unwrap_or("127.0.0.1:7878".to_string());
13+
2014
RSocketFactory::receive()
21-
.transport(URI::Tcp("127.0.0.1:7878"))
15+
.transport(URI::Tcp(addr))
2216
.acceptor(|setup, sending_socket| {
2317
info!("accept setup: {:?}", setup);
2418
Box::new(EchoRSocket)

examples/echo_client/main.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
extern crate rsocket_rust;
2+
extern crate tokio;
3+
#[macro_use]
4+
extern crate log;
5+
use rsocket_rust::prelude::*;
6+
7+
#[tokio::main]
8+
async fn main() {
9+
env_logger::builder().init();
10+
let setup = Payload::builder()
11+
.set_data_utf8("Hello")
12+
.set_metadata_utf8("World")
13+
.build();
14+
let cli = RSocketFactory::connect()
15+
.transport(URI::Tcp("127.0.0.1:7878".to_string()))
16+
.setup(setup)
17+
.start()
18+
.await;
19+
20+
std::thread::sleep_ms(500000);
21+
}

src/spi.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ impl RSocket for EchoRSocket {
2929
}
3030

3131
fn request_response(&self, req: Payload) -> Single<Payload> {
32+
info!("echo request_response: {:?}", req);
3233
Box::pin(future::ok::<Payload, RSocketError>(req))
3334
}
3435
}

src/transport/conn.rs

Lines changed: 0 additions & 140 deletions
This file was deleted.

src/transport/misc.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,3 @@ impl From<u32> for StreamID {
3737
StreamID::new(v)
3838
}
3939
}
40-
41-
#[derive(Debug)]
42-
pub(crate) enum Handler {
43-
Request(oneshot::Sender<Payload>),
44-
Stream(mpsc::Sender<Payload>),
45-
}
46-
47-
#[derive(Debug)]
48-
pub(crate) struct Handlers {
49-
map: RwLock<HashMap<u32, Handler>>,
50-
}
51-
52-
impl Handlers {
53-
pub(crate) fn new() -> Handlers {
54-
Handlers {
55-
map: RwLock::new(HashMap::new()),
56-
}
57-
}
58-
}

src/transport/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
mod codec;
2-
mod conn;
32
mod misc;
3+
mod socket;
44
mod spi;
55
pub(crate) mod tcp;
66
pub(crate) use codec::RFrameCodec;
7-
pub use conn::DuplexSocket;
7+
pub(crate) use socket::DuplexSocket;
88
pub use spi::{Rx, Transport, Tx};

0 commit comments

Comments
 (0)