|
2 | 2 |
|
3 | 3 | use crate::cell::UnsafeCell; |
4 | 4 | use crate::future::{poll_fn, Future}; |
| 5 | +use crate::mem; |
5 | 6 | use crate::pin::Pin; |
6 | | -use crate::task::Poll; |
| 7 | +use crate::task::{Context, Poll}; |
7 | 8 |
|
8 | 9 | /// Polls multiple futures simultaneously, returning a tuple |
9 | 10 | /// of all results once complete. |
@@ -70,64 +71,77 @@ pub macro join { |
70 | 71 | $( $(@$f:tt)? $fut:expr => ( $($pos:tt)* ), )* |
71 | 72 | }, |
72 | 73 | @rest: () |
73 | | - ) => {{ |
| 74 | + ) => { |
74 | 75 | async move { |
75 | | - // The futures and whether they have completed |
76 | | - let mut state = ( $( UnsafeCell::new(($fut, false)), )* ); |
77 | | - |
78 | | - // Make sure the futures don't panic |
79 | | - // if polled after completion, and |
80 | | - // store their output separately |
81 | | - let mut futures = ($( |
82 | | - ({ |
83 | | - let ( $($pos,)* state, .. ) = &state; |
84 | | - |
85 | | - poll_fn(move |cx| { |
86 | | - // SAFETY: each future borrows a distinct element |
87 | | - // of the tuple |
88 | | - let (fut, done) = unsafe { &mut *state.get() }; |
89 | | - |
90 | | - if *done { |
91 | | - return Poll::Ready(None) |
92 | | - } |
93 | | - |
94 | | - // SAFETY: The futures are never moved |
95 | | - match unsafe { Pin::new_unchecked(fut).poll(cx) } { |
96 | | - Poll::Ready(val) => { |
97 | | - *done = true; |
98 | | - Poll::Ready(Some(val)) |
99 | | - } |
100 | | - Poll::Pending => Poll::Pending |
101 | | - } |
102 | | - }) |
103 | | - }, None), |
104 | | - )*); |
| 76 | + let mut futures = ( $( MaybeDone::Future($fut), )* ); |
105 | 77 |
|
106 | 78 | poll_fn(move |cx| { |
107 | 79 | let mut done = true; |
108 | 80 |
|
109 | 81 | $( |
110 | | - let ( $($pos,)* (fut, out), .. ) = &mut futures; |
| 82 | + let ( $($pos,)* fut, .. ) = &mut futures; |
111 | 83 |
|
112 | 84 | // SAFETY: The futures are never moved |
113 | | - match unsafe { Pin::new_unchecked(fut).poll(cx) } { |
114 | | - Poll::Ready(Some(val)) => *out = Some(val), |
115 | | - // the future was already done |
116 | | - Poll::Ready(None) => {}, |
117 | | - Poll::Pending => done = false, |
118 | | - } |
| 85 | + done &= unsafe { Pin::new_unchecked(fut).poll(cx).is_ready() }; |
119 | 86 | )* |
120 | 87 |
|
121 | 88 | if done { |
122 | 89 | // Extract all the outputs |
123 | 90 | Poll::Ready(($({ |
124 | | - let ( $($pos,)* (_, val), .. ) = &mut futures; |
125 | | - val.unwrap() |
| 91 | + let ( $($pos,)* fut, .. ) = &mut futures; |
| 92 | + |
| 93 | + fut.take_output().unwrap() |
126 | 94 | }),*)) |
127 | 95 | } else { |
128 | 96 | Poll::Pending |
129 | 97 | } |
130 | 98 | }).await |
131 | 99 | } |
132 | | - }} |
| 100 | + } |
| 101 | +} |
| 102 | + |
| 103 | +/// Future used by `join!` that stores it's output to |
| 104 | +/// be later taken and doesn't panic when polled after ready. |
| 105 | +/// |
| 106 | +/// This type is public in a private module for use by the macro. |
| 107 | +#[allow(missing_debug_implementations)] |
| 108 | +#[unstable(feature = "future_join", issue = "91642")] |
| 109 | +pub enum MaybeDone<F: Future> { |
| 110 | + Future(F), |
| 111 | + Done(F::Output), |
| 112 | + Took, |
| 113 | +} |
| 114 | + |
| 115 | +#[unstable(feature = "future_join", issue = "91642")] |
| 116 | +impl<F: Future> MaybeDone<F> { |
| 117 | + pub fn take_output(&mut self) -> Option<F::Output> { |
| 118 | + match &*self { |
| 119 | + MaybeDone::Done(_) => match mem::replace(self, Self::Took) { |
| 120 | + MaybeDone::Done(val) => Some(val), |
| 121 | + _ => unreachable!(), |
| 122 | + }, |
| 123 | + _ => None, |
| 124 | + } |
| 125 | + } |
| 126 | +} |
| 127 | + |
| 128 | +#[unstable(feature = "future_join", issue = "91642")] |
| 129 | +impl<F: Future> Future for MaybeDone<F> { |
| 130 | + type Output = (); |
| 131 | + |
| 132 | + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 133 | + // SAFETY: pinning in structural for `f` |
| 134 | + unsafe { |
| 135 | + match self.as_mut().get_unchecked_mut() { |
| 136 | + MaybeDone::Future(f) => match Pin::new_unchecked(f).poll(cx) { |
| 137 | + Poll::Ready(val) => self.set(Self::Done(val)), |
| 138 | + Poll::Pending => return Poll::Pending, |
| 139 | + }, |
| 140 | + MaybeDone::Done(_) => {} |
| 141 | + MaybeDone::Took => unreachable!(), |
| 142 | + } |
| 143 | + } |
| 144 | + |
| 145 | + Poll::Ready(()) |
| 146 | + } |
133 | 147 | } |
0 commit comments