@@ -4,6 +4,7 @@ use crate::cell::UnsafeCell;
44use crate :: future:: { poll_fn, Future } ;
55use crate :: pin:: Pin ;
66use crate :: task:: Poll ;
7+
78/// Polls multiple futures simultaneously, returning a tuple
89/// of all results once complete.
910///
@@ -53,95 +54,78 @@ pub macro join {
5354 @count : ( $( $count: tt) * ) ,
5455 // Futures and their positions in the tuple: "{ a => (_), b => (_ _)) }"
5556 @futures : { $( $fut: tt) * } ,
56- // The future currently being expanded, and the rest
57+ // Take a future from @rest to expand
5758 @rest : ( $current: expr, $( $rest: tt) * )
5859 ) => {
5960 join ! {
60- @count: ( $( $count) * _) , // Add to the count
61- @futures: { $( $fut) * $current => ( $( $count) * ) , } , // Add the future from @rest with it's position
62- @rest: ( $( $rest) * ) // And leave the rest
61+ @count: ( $( $count) * _) ,
62+ @futures: { $( $fut) * $current => ( $( $count) * ) , } ,
63+ @rest: ( $( $rest) * )
6364 }
6465 } ,
6566 // Now generate the output future
6667 (
6768 @count: ( $( $count: tt) * ) ,
6869 @futures : {
69- $( $fut: expr => ( $( $pos: tt) * ) , ) *
70+ $( $( @$f : tt ) ? $ fut: expr => ( $( $pos: tt) * ) , ) *
7071 } ,
7172 @rest : ( )
7273 ) => { {
73- let mut futures = ( $( MaybeDone :: Future ( $fut) , ) * ) ;
74+ // The futures and whether they have completed
75+ let mut state = ( $( UnsafeCell :: new ( ( $fut, false ) ) , ) * ) ;
76+
77+ // Make sure the futures don't panic
78+ // if polled after completion, and
79+ // store their output separately
80+ let mut futures = ( $(
81+ ( {
82+ let ( $( $pos, ) * state, .. ) = & state;
83+
84+ poll_fn ( move |cx| {
85+ // SAFETY: each future borrows a distinct element
86+ // of the tuple
87+ let ( fut, done) = unsafe { & mut * state. get ( ) } ;
88+
89+ if * done {
90+ return Poll :: Ready ( None )
91+ }
92+
93+ // SAFETY: The futures are never moved
94+ match unsafe { Pin :: new_unchecked ( fut) . poll ( cx) } {
95+ Poll :: Ready ( val) => {
96+ * done = true ;
97+ Poll :: Ready ( Some ( val) )
98+ }
99+ Poll :: Pending => Poll :: Pending
100+ }
101+ } )
102+ } , None ) ,
103+ ) * ) ;
74104
75105 poll_fn ( move |cx| {
76106 let mut done = true ;
77107
78108 $(
79- // Extract the future from the tuple
80- let ( $( $pos, ) * fut, .. ) = & mut futures;
109+ let ( $( $pos, ) * ( fut, out) , .. ) = & mut futures;
81110
82- // SAFETY: the futures are never moved
83- done &= unsafe { Pin :: new_unchecked ( fut) . poll ( cx) . is_ready ( ) } ;
111+ // SAFETY: The futures are never moved
112+ match unsafe { Pin :: new_unchecked ( fut) . poll ( cx) } {
113+ Poll :: Ready ( Some ( val) ) => * out = Some ( val) ,
114+ // the future was already done
115+ Poll :: Ready ( None ) => { } ,
116+ Poll :: Pending => done = false ,
117+ }
84118 ) *
85119
86120 if done {
121+ // Extract all the outputs
87122 Poll :: Ready ( ( $( {
88- let ( $( $pos, ) * fut, .. ) = & mut futures;
89-
90- // SAFETY: the futures are never moved
91- unsafe { Pin :: new_unchecked ( fut) . take_output ( ) . unwrap ( ) }
123+ let ( $( $pos, ) * ( _, val) , .. ) = & mut futures;
124+ val. unwrap ( )
92125 } ) , * ) )
93126 } else {
94127 Poll :: Pending
95128 }
96129 } ) . await
97130 } }
98131}
99-
100- /// Future used by `join!` that stores it's output to
101- /// be later taken and doesn' t panic when polled after ready.
102- #[ allow ( dead_code) ]
103- #[ unstable( feature = "future_join" , issue = "none" ) ]
104- enum MaybeDone < F : Future > {
105- Future ( F ) ,
106- Done ( F :: Output ) ,
107- Took ,
108- }
109-
110- #[ unstable( feature = "future_join" , issue = "none" ) ]
111- impl < F : Future + Unpin > Unpin for MaybeDone < F > { }
112-
113- #[ unstable( feature = "future_join" , issue = "none" ) ]
114- impl < F : Future > MaybeDone < F > {
115- #[ allow( dead_code) ]
116- fn take_output ( self : Pin < & mut Self > ) -> Option < F :: Output > {
117- unsafe {
118- match & * self {
119- MaybeDone :: Done ( _) => match mem:: replace ( self . get_unchecked_mut ( ) , Self :: Took ) {
120- MaybeDone :: Done ( val) => Some ( val) ,
121- _ => unreachable ! ( ) ,
122- } ,
123- _ => None ,
124- }
125- }
126- }
127- }
128-
129- #[ unstable( feature = "future_join" , issue = "none" ) ]
130- impl < F : Future > Future for MaybeDone < F > {
131- type Output = ( ) ;
132-
133- fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
134- unsafe {
135- match self . as_mut ( ) . get_unchecked_mut ( ) {
136- MaybeDone :: Future ( f) => match Pin :: new_unchecked ( f) . poll ( cx) {
137- Poll :: Ready ( val) => self . set ( Self :: Done ( val) ) ,
138- Poll :: Pending => return Poll :: Pending ,
139- } ,
140- MaybeDone :: Done ( _) => { }
141- MaybeDone :: Took => unreachable ! ( ) ,
142- }
143- }
144-
145- Poll :: Ready ( ( ) )
146- }
147- }
0 commit comments