@@ -8,24 +8,30 @@ use futures_core::ready;
88use futures_core:: stream:: { FusedStream , Stream } ;
99use futures_core:: task:: { Context , Poll } ;
1010
11+ use pin_project_lite:: pin_project;
12+
1113use super :: assert_stream;
14+ use crate :: stream:: futures_unordered:: { IntoIter , Iter , IterMut , IterPinMut , IterPinRef } ;
1215use crate :: stream:: { FuturesUnordered , StreamExt , StreamFuture } ;
1316
14- /// An unbounded set of streams
15- ///
16- /// This "combinator" provides the ability to maintain a set of streams
17- /// and drive them all to completion.
18- ///
19- /// Streams are pushed into this set and their realized values are
20- /// yielded as they become ready. Streams will only be polled when they
21- /// generate notifications. This allows to coordinate a large number of streams.
22- ///
23- /// Note that you can create a ready-made `SelectAll` via the
24- /// `select_all` function in the `stream` module, or you can start with an
25- /// empty set with the `SelectAll::new` constructor.
26- #[ must_use = "streams do nothing unless polled" ]
27- pub struct SelectAll < St > {
28- inner : FuturesUnordered < StreamFuture < St > > ,
17+ pin_project ! {
18+ /// An unbounded set of streams
19+ ///
20+ /// This "combinator" provides the ability to maintain a set of streams
21+ /// and drive them all to completion.
22+ ///
23+ /// Streams are pushed into this set and their realized values are
24+ /// yielded as they become ready. Streams will only be polled when they
25+ /// generate notifications. This allows to coordinate a large number of streams.
26+ ///
27+ /// Note that you can create a ready-made `SelectAll` via the
28+ /// `select_all` function in the `stream` module, or you can start with an
29+ /// empty set with the `SelectAll::new` constructor.
30+ #[ must_use = "streams do nothing unless polled" ]
31+ pub struct SelectAll <St > {
32+ #[ pin]
33+ inner: FuturesUnordered <StreamFuture <St >>,
34+ }
2935}
3036
3137impl < St : Debug > Debug for SelectAll < St > {
@@ -64,6 +70,26 @@ impl<St: Stream + Unpin> SelectAll<St> {
6470 pub fn push ( & mut self , stream : St ) {
6571 self . inner . push ( stream. into_future ( ) ) ;
6672 }
73+
74+ /// Returns an iterator that allows inspecting each future in the set.
75+ pub fn iter ( & self ) -> Iter < ' _ , StreamFuture < St > > {
76+ self . inner . iter ( )
77+ }
78+
79+ /// Returns an iterator that allows inspecting each future in the set.
80+ pub fn iter_pin_ref ( self : Pin < & ' _ Self > ) -> IterPinRef < ' _ , StreamFuture < St > > {
81+ self . project_ref ( ) . inner . iter_pin_ref ( )
82+ }
83+
84+ /// Returns an iterator that allows modifying each future in the set.
85+ pub fn iter_mut ( & mut self ) -> IterMut < ' _ , StreamFuture < St > > {
86+ self . inner . iter_mut ( )
87+ }
88+
89+ /// Returns an iterator that allows modifying each future in the set.
90+ pub fn iter_pin_mut ( self : Pin < & mut Self > ) -> IterPinMut < ' _ , StreamFuture < St > > {
91+ self . project ( ) . inner . iter_pin_mut ( )
92+ }
6793}
6894
6995impl < St : Stream + Unpin > Default for SelectAll < St > {
@@ -139,3 +165,30 @@ impl<St: Stream + Unpin> Extend<St> for SelectAll<St> {
139165 }
140166 }
141167}
168+
169+ impl < St : Stream + Unpin > IntoIterator for SelectAll < St > {
170+ type Item = StreamFuture < St > ;
171+ type IntoIter = IntoIter < StreamFuture < St > > ;
172+
173+ fn into_iter ( self ) -> Self :: IntoIter {
174+ self . inner . into_iter ( )
175+ }
176+ }
177+
178+ impl < ' a , St : Stream + Unpin > IntoIterator for & ' a SelectAll < St > {
179+ type Item = & ' a StreamFuture < St > ;
180+ type IntoIter = Iter < ' a , StreamFuture < St > > ;
181+
182+ fn into_iter ( self ) -> Self :: IntoIter {
183+ self . iter ( )
184+ }
185+ }
186+
187+ impl < ' a , St : Stream + Unpin > IntoIterator for & ' a mut SelectAll < St > {
188+ type Item = & ' a mut StreamFuture < St > ;
189+ type IntoIter = IterMut < ' a , StreamFuture < St > > ;
190+
191+ fn into_iter ( self ) -> Self :: IntoIter {
192+ self . iter_mut ( )
193+ }
194+ }
0 commit comments