File tree Expand file tree Collapse file tree 6 files changed +51
-8
lines changed Expand file tree Collapse file tree 6 files changed +51
-8
lines changed Original file line number Diff line number Diff line change @@ -191,7 +191,7 @@ impl TcpWatcher {
191191 TcpWatcher {
192192 home : home,
193193 handle : handle,
194- stream : StreamWatcher :: new ( handle) ,
194+ stream : StreamWatcher :: new ( handle, true ) ,
195195 refcount : Refcount :: new ( ) ,
196196 read_access : AccessTimeout :: new ( ) ,
197197 write_access : AccessTimeout :: new ( ) ,
@@ -278,7 +278,7 @@ impl rtio::RtioTcpStream for TcpWatcher {
278278 fn clone ( & self ) -> Box < rtio:: RtioTcpStream + Send > {
279279 box TcpWatcher {
280280 handle : self . handle ,
281- stream : StreamWatcher :: new ( self . handle ) ,
281+ stream : StreamWatcher :: new ( self . handle , false ) ,
282282 home : self . home . clone ( ) ,
283283 refcount : self . refcount . clone ( ) ,
284284 read_access : self . read_access . clone ( ) ,
Original file line number Diff line number Diff line change @@ -67,7 +67,7 @@ impl PipeWatcher {
6767 handle
6868 } ;
6969 PipeWatcher {
70- stream : StreamWatcher :: new ( handle) ,
70+ stream : StreamWatcher :: new ( handle, true ) ,
7171 home : home,
7272 defused : false ,
7373 refcount : Refcount :: new ( ) ,
@@ -131,7 +131,7 @@ impl rtio::RtioPipe for PipeWatcher {
131131
132132 fn clone ( & self ) -> Box < rtio:: RtioPipe + Send > {
133133 box PipeWatcher {
134- stream : StreamWatcher :: new ( self . stream . handle ) ,
134+ stream : StreamWatcher :: new ( self . stream . handle , false ) ,
135135 defused : false ,
136136 home : self . home . clone ( ) ,
137137 refcount : self . refcount . clone ( ) ,
Original file line number Diff line number Diff line change @@ -59,8 +59,11 @@ impl StreamWatcher {
5959 // will be manipulated on each of the methods called on this watcher.
6060 // Wrappers should ensure to always reset the field to an appropriate value
6161 // if they rely on the field to perform an action.
62- pub fn new ( stream : * mut uvll:: uv_stream_t ) -> StreamWatcher {
63- unsafe { uvll:: set_data_for_uv_handle ( stream, 0 as * mut int ) }
62+ pub fn new ( stream : * mut uvll:: uv_stream_t ,
63+ init : bool ) -> StreamWatcher {
64+ if init {
65+ unsafe { uvll:: set_data_for_uv_handle ( stream, 0 as * mut int ) }
66+ }
6467 StreamWatcher {
6568 handle : stream,
6669 last_write_req : None ,
Original file line number Diff line number Diff line change @@ -56,7 +56,7 @@ impl TtyWatcher {
5656 let handle = UvHandle :: alloc ( None :: < TtyWatcher > , uvll:: UV_TTY ) ;
5757 let mut watcher = TtyWatcher {
5858 tty : handle,
59- stream : StreamWatcher :: new ( handle) ,
59+ stream : StreamWatcher :: new ( handle, true ) ,
6060 home : io. make_handle ( ) ,
6161 fd : fd,
6262 } ;
Original file line number Diff line number Diff line change @@ -1360,4 +1360,44 @@ mod test {
13601360
13611361 rx2. recv( ) ;
13621362 } )
1363+
1364+ iotest ! ( fn clone_while_reading( ) {
1365+ let addr = next_test_ip6( ) ;
1366+ let listen = TcpListener :: bind( addr. ip. to_str( ) . as_slice( ) , addr. port) ;
1367+ let mut accept = listen. listen( ) . unwrap( ) ;
1368+
1369+ // Enqueue a task to write to a socket
1370+ let ( tx, rx) = channel( ) ;
1371+ let ( txdone, rxdone) = channel( ) ;
1372+ let txdone2 = txdone. clone( ) ;
1373+ spawn( proc( ) {
1374+ let mut tcp = TcpStream :: connect( addr. ip. to_str( ) . as_slice( ) ,
1375+ addr. port) . unwrap( ) ;
1376+ rx. recv( ) ;
1377+ tcp. write_u8( 0 ) . unwrap( ) ;
1378+ txdone2. send( ( ) ) ;
1379+ } ) ;
1380+
1381+ // Spawn off a reading clone
1382+ let tcp = accept. accept( ) . unwrap( ) ;
1383+ let tcp2 = tcp. clone( ) ;
1384+ let txdone3 = txdone. clone( ) ;
1385+ spawn( proc( ) {
1386+ let mut tcp2 = tcp2;
1387+ tcp2. read_u8( ) . unwrap( ) ;
1388+ txdone3. send( ( ) ) ;
1389+ } ) ;
1390+
1391+ // Try to ensure that the reading clone is indeed reading
1392+ for _ in range( 0 i, 50 ) {
1393+ :: task:: deschedule( ) ;
1394+ }
1395+
1396+ // clone the handle again while it's reading, then let it finish the
1397+ // read.
1398+ let _ = tcp. clone( ) ;
1399+ tx. send( ( ) ) ;
1400+ rxdone. recv( ) ;
1401+ rxdone. recv( ) ;
1402+ } )
13631403}
Original file line number Diff line number Diff line change @@ -649,7 +649,7 @@ fn task_abort_no_kill_runtime() {
649649 use std:: io:: timer;
650650 use mem;
651651
652- let mut tb = TaskBuilder :: new ( ) ;
652+ let tb = TaskBuilder :: new ( ) ;
653653 let rx = tb. try_future ( proc ( ) { } ) ;
654654 mem:: drop ( rx) ;
655655 timer:: sleep ( 1000 ) ;
You can’t perform that action at this time.
0 commit comments