1- // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
1+ // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
22// file at the top-level directory of this distribution and at
33// http://rust-lang.org/COPYRIGHT.
44//
@@ -90,9 +90,55 @@ pub fn DuplexStream<T:Send,U:Send>()
9090 } )
9191}
9292
93+ /// An extension of `pipes::stream` that provides synchronous message sending.
94+ pub struct SyncChan < T > { priv duplex_stream : DuplexStream < T , ( ) > }
95+ /// An extension of `pipes::stream` that acknowledges each message received.
96+ pub struct SyncPort < T > { priv duplex_stream : DuplexStream < ( ) , T > }
97+
98+ impl < T : Send > GenericChan < T > for SyncChan < T > {
99+ fn send ( & self , val : T ) {
100+ assert ! ( self . try_send( val) , "SyncChan.send: receiving port closed" ) ;
101+ }
102+ }
103+
104+ impl < T : Send > GenericSmartChan < T > for SyncChan < T > {
105+ /// Sends a message, or report if the receiver has closed the connection before receiving.
106+ fn try_send ( & self , val : T ) -> bool {
107+ self . duplex_stream . try_send ( val) && self . duplex_stream . try_recv ( ) . is_some ( )
108+ }
109+ }
110+
111+ impl < T : Send > GenericPort < T > for SyncPort < T > {
112+ fn recv ( & self ) -> T {
113+ self . try_recv ( ) . expect ( "SyncPort.recv: sending channel closed" )
114+ }
115+
116+ fn try_recv ( & self ) -> Option < T > {
117+ do self . duplex_stream . try_recv ( ) . map_move |val| {
118+ self . duplex_stream . try_send ( ( ) ) ;
119+ val
120+ }
121+ }
122+ }
123+
124+ impl < T : Send > Peekable < T > for SyncPort < T > {
125+ fn peek ( & self ) -> bool {
126+ self . duplex_stream . peek ( )
127+ }
128+ }
129+
130+ /// Creates a stream whose channel, upon sending a message, blocks until the message is received.
131+ pub fn rendezvous < T : Send > ( ) -> ( SyncPort < T > , SyncChan < T > ) {
132+ let ( chan_stream, port_stream) = DuplexStream ( ) ;
133+ ( SyncPort { duplex_stream : port_stream } , SyncChan { duplex_stream : chan_stream } )
134+ }
135+
93136#[ cfg( test) ]
94137mod test {
95- use comm:: DuplexStream ;
138+ use comm:: { DuplexStream , rendezvous} ;
139+ use std:: rt:: test:: run_in_newsched_task;
140+ use std:: task:: spawn_unlinked;
141+
96142
97143 #[ test]
98144 pub fn DuplexStream1 ( ) {
@@ -104,4 +150,58 @@ mod test {
104150 assert ! ( left. recv( ) == 123 ) ;
105151 assert ! ( right. recv( ) == ~"abc");
106152 }
153+
154+ #[test]
155+ pub fn basic_rendezvous_test() {
156+ let (port, chan) = rendezvous();
157+
158+ do spawn {
159+ chan.send(" abc");
160+ }
161+
162+ assert!(port.recv() == " abc" ) ;
163+ }
164+
165+ #[ test]
166+ fn recv_a_lot ( ) {
167+ // Rendezvous streams should be able to handle any number of messages being sent
168+ do run_in_newsched_task {
169+ let ( port, chan) = rendezvous ( ) ;
170+ do spawn {
171+ do 1000000 . times { chan. send ( ( ) ) }
172+ }
173+ do 1000000 . times { port. recv ( ) }
174+ }
175+ }
176+
177+ #[ test]
178+ fn send_and_fail_and_try_recv ( ) {
179+ let ( port, chan) = rendezvous ( ) ;
180+ do spawn_unlinked {
181+ chan. duplex_stream . send ( ( ) ) ; // Can't access this field outside this module
182+ fail ! ( )
183+ }
184+ port. recv ( )
185+ }
186+
187+ #[ test]
188+ fn try_send_and_recv_then_fail_before_ack ( ) {
189+ let ( port, chan) = rendezvous ( ) ;
190+ do spawn_unlinked {
191+ port. duplex_stream . recv ( ) ;
192+ fail ! ( )
193+ }
194+ chan. try_send ( ( ) ) ;
195+ }
196+
197+ #[ test]
198+ #[ should_fail]
199+ fn send_and_recv_then_fail_before_ack ( ) {
200+ let ( port, chan) = rendezvous ( ) ;
201+ do spawn_unlinked {
202+ port. duplex_stream . recv ( ) ;
203+ fail ! ( )
204+ }
205+ chan. send ( ( ) ) ;
206+ }
107207}
0 commit comments