@@ -16,11 +16,6 @@ Higher level communication abstractions.
1616
1717#[ allow( missing_doc) ] ;
1818
19-
20- use std:: comm:: { GenericChan , GenericSmartChan , GenericPort } ;
21- use std:: comm:: { Chan , Port , Peekable } ;
22- use std:: comm;
23-
2419/// An extension of `pipes::stream` that allows both sending and receiving.
2520pub struct DuplexStream < T , U > {
2621 priv chan : Chan < T > ,
@@ -29,108 +24,73 @@ pub struct DuplexStream<T, U> {
2924
3025// Allow these methods to be used without import:
3126impl < T : Send , U : Send > DuplexStream < T , U > {
27+ /// Creates a bidirectional stream.
28+ pub fn new ( ) -> ( DuplexStream < T , U > , DuplexStream < U , T > ) {
29+ let ( p1, c2) = Chan :: new ( ) ;
30+ let ( p2, c1) = Chan :: new ( ) ;
31+ ( DuplexStream { chan : c1, port : p1 } ,
32+ DuplexStream { chan : c2, port : p2 } )
33+ }
3234 pub fn send ( & self , x : T ) {
3335 self . chan . send ( x)
3436 }
3537 pub fn try_send ( & self , x : T ) -> bool {
3638 self . chan . try_send ( x)
3739 }
38- pub fn recv ( & self , ) -> U {
40+ pub fn recv ( & self ) -> U {
3941 self . port . recv ( )
4042 }
4143 pub fn try_recv ( & self ) -> Option < U > {
4244 self . port . try_recv ( )
4345 }
44- pub fn peek ( & self ) -> bool {
45- self . port . peek ( )
46- }
47- }
48-
49- impl < T : Send , U : Send > GenericChan < T > for DuplexStream < T , U > {
50- fn send ( & self , x : T ) {
51- self . chan . send ( x)
52- }
53- }
54-
55- impl < T : Send , U : Send > GenericSmartChan < T > for DuplexStream < T , U > {
56- fn try_send ( & self , x : T ) -> bool {
57- self . chan . try_send ( x)
58- }
59- }
60-
61- impl < T : Send , U : Send > GenericPort < U > for DuplexStream < T , U > {
62- fn recv ( & self ) -> U {
63- self . port . recv ( )
64- }
65-
66- fn try_recv ( & self ) -> Option < U > {
67- self . port . try_recv ( )
68- }
69- }
70-
71- impl < T : Send , U : Send > Peekable < U > for DuplexStream < T , U > {
72- fn peek ( & self ) -> bool {
73- self . port . peek ( )
46+ pub fn recv_opt ( & self ) -> Option < U > {
47+ self . port . recv_opt ( )
7448 }
7549}
7650
77- /// Creates a bidirectional stream.
78- pub fn DuplexStream < T : Send , U : Send > ( )
79- -> ( DuplexStream < T , U > , DuplexStream < U , T > )
80- {
81- let ( p1, c2) = comm:: stream ( ) ;
82- let ( p2, c1) = comm:: stream ( ) ;
83- ( DuplexStream {
84- chan : c1,
85- port : p1
86- } ,
87- DuplexStream {
88- chan : c2,
89- port : p2
90- } )
91- }
92-
9351/// An extension of `pipes::stream` that provides synchronous message sending.
9452pub struct SyncChan < T > { priv duplex_stream : DuplexStream < T , ( ) > }
9553/// An extension of `pipes::stream` that acknowledges each message received.
9654pub struct SyncPort < T > { priv duplex_stream : DuplexStream < ( ) , T > }
9755
98- impl < T : Send > GenericChan < T > for SyncChan < T > {
99- fn send ( & self , val : T ) {
56+ impl < T : Send > SyncChan < T > {
57+ pub fn send ( & self , val : T ) {
10058 assert ! ( self . try_send( val) , "SyncChan.send: receiving port closed" ) ;
10159 }
102- }
10360
104- impl < T : Send > GenericSmartChan < T > for SyncChan < T > {
105- /// Sends a message, or report if the receiver has closed the connection before receiving.
106- fn try_send ( & self , val : T ) -> bool {
107- self . duplex_stream . try_send ( val) && self . duplex_stream . try_recv ( ) . is_some ( )
61+ /// Sends a message, or report if the receiver has closed the connection
62+ /// before receiving.
63+ pub fn try_send ( & self , val : T ) -> bool {
64+ self . duplex_stream . try_send ( val) && self . duplex_stream . recv_opt ( ) . is_some ( )
10865 }
10966}
11067
111- impl < T : Send > GenericPort < T > for SyncPort < T > {
112- fn recv ( & self ) -> T {
113- self . try_recv ( ) . expect ( "SyncPort.recv: sending channel closed" )
68+ impl < T : Send > SyncPort < T > {
69+ pub fn recv ( & self ) -> T {
70+ self . recv_opt ( ) . expect ( "SyncPort.recv: sending channel closed" )
11471 }
11572
116- fn try_recv ( & self ) -> Option < T > {
117- self . duplex_stream . try_recv ( ) . map ( |val| {
73+ pub fn recv_opt ( & self ) -> Option < T > {
74+ self . duplex_stream . recv_opt ( ) . map ( |val| {
11875 self . duplex_stream . try_send ( ( ) ) ;
11976 val
12077 } )
12178 }
122- }
12379
124- impl < T : Send > Peekable < T > for SyncPort < T > {
125- fn peek ( & self ) -> bool {
126- self . duplex_stream . peek ( )
80+ pub fn try_recv ( & self ) -> Option < T > {
81+ self . duplex_stream . try_recv ( ) . map ( |val| {
82+ self . duplex_stream . try_send ( ( ) ) ;
83+ val
84+ } )
12785 }
12886}
12987
130- /// Creates a stream whose channel, upon sending a message, blocks until the message is received.
88+ /// Creates a stream whose channel, upon sending a message, blocks until the
89+ /// message is received.
13190pub fn rendezvous < T : Send > ( ) -> ( SyncPort < T > , SyncChan < T > ) {
132- let ( chan_stream, port_stream) = DuplexStream ( ) ;
133- ( SyncPort { duplex_stream : port_stream } , SyncChan { duplex_stream : chan_stream } )
91+ let ( chan_stream, port_stream) = DuplexStream :: new ( ) ;
92+ ( SyncPort { duplex_stream : port_stream } ,
93+ SyncChan { duplex_stream : chan_stream } )
13494}
13595
13696#[ cfg( test) ]
@@ -141,7 +101,7 @@ mod test {
141101
142102 #[ test]
143103 pub fn DuplexStream1 ( ) {
144- let ( left, right) = DuplexStream ( ) ;
104+ let ( mut left, mut right) = DuplexStream :: new ( ) ;
145105
146106 left. send ( ~"abc") ;
147107 right. send ( 123 ) ;
@@ -152,9 +112,10 @@ mod test {
152112
153113 #[test]
154114 pub fn basic_rendezvous_test() {
155- let (port, chan) = rendezvous();
115+ let (mut port, chan) = rendezvous();
156116
157117 do spawn {
118+ let mut chan = chan;
158119 chan.send(" abc");
159120 }
160121
@@ -165,8 +126,9 @@ mod test {
165126 fn recv_a_lot ( ) {
166127 // Rendezvous streams should be able to handle any number of messages being sent
167128 do run_in_uv_task {
168- let ( port, chan) = rendezvous ( ) ;
129+ let ( mut port, chan) = rendezvous ( ) ;
169130 do spawn {
131+ let mut chan = chan;
170132 1000000 . times ( || { chan. send ( ( ) ) } )
171133 }
172134 1000000 . times ( || { port. recv ( ) } )
@@ -175,8 +137,9 @@ mod test {
175137
176138 #[ test]
177139 fn send_and_fail_and_try_recv ( ) {
178- let ( port, chan) = rendezvous ( ) ;
140+ let ( mut port, chan) = rendezvous ( ) ;
179141 do spawn {
142+ let mut chan = chan;
180143 chan. duplex_stream . send ( ( ) ) ; // Can't access this field outside this module
181144 fail ! ( )
182145 }
@@ -185,8 +148,9 @@ mod test {
185148
186149 #[ test]
187150 fn try_send_and_recv_then_fail_before_ack ( ) {
188- let ( port, chan) = rendezvous ( ) ;
151+ let ( port, mut chan) = rendezvous ( ) ;
189152 do spawn {
153+ let mut port = port;
190154 port. duplex_stream . recv ( ) ;
191155 fail ! ( )
192156 }
@@ -196,8 +160,9 @@ mod test {
196160 #[ test]
197161 #[ should_fail]
198162 fn send_and_recv_then_fail_before_ack ( ) {
199- let ( port, chan) = rendezvous ( ) ;
163+ let ( port, mut chan) = rendezvous ( ) ;
200164 do spawn {
165+ let mut port = port;
201166 port. duplex_stream . recv ( ) ;
202167 fail ! ( )
203168 }
0 commit comments