File tree Expand file tree Collapse file tree 5 files changed +61
-9
lines changed Expand file tree Collapse file tree 5 files changed +61
-9
lines changed Original file line number Diff line number Diff line change @@ -67,6 +67,7 @@ cfg_if! {
6767
6868 mod vec;
6969 mod result;
70+ mod option;
7071 }
7172}
7273
Original file line number Diff line number Diff line change 1+ use std:: pin:: Pin ;
2+
3+ use crate :: prelude:: * ;
4+ use crate :: stream:: { FromStream , IntoStream } ;
5+
6+ impl < T , V > FromStream < Option < T > > for Option < V >
7+ where
8+ V : FromStream < T > ,
9+ {
10+ /// Takes each element in the stream: if it is `None`, no further
11+ /// elements are taken, and `None` is returned. Should no `None`
12+ /// occur, a container with the values of each `Option` is returned.
13+ #[ inline]
14+ fn from_stream < ' a , S : IntoStream < Item = Option < T > > > (
15+ stream : S ,
16+ ) -> Pin < Box < dyn core:: future:: Future < Output = Self > + ' a > >
17+ where
18+ <S as IntoStream >:: IntoStream : ' a ,
19+ {
20+ let stream = stream. into_stream ( ) ;
21+
22+ Box :: pin ( async move {
23+ pin_utils:: pin_mut!( stream) ;
24+
25+ // Using `scan` here because it is able to stop the stream early
26+ // if a failure occurs
27+ let mut found_error = false ;
28+ let out: V = stream
29+ . scan ( ( ) , |_, elem| {
30+ match elem {
31+ Some ( elem) => Some ( elem) ,
32+ None => {
33+ found_error = true ;
34+ // Stop processing the stream on error
35+ None
36+ }
37+ }
38+ } )
39+ . collect ( )
40+ . await ;
41+
42+ if found_error { None } else { Some ( out) }
43+ } )
44+ }
45+ }
Original file line number Diff line number Diff line change 1+ //! The Rust core optional value type
2+ //!
3+ //! This module provides the `Option<T>` type for returning and
4+ //! propagating optional values.
5+
6+ mod from_stream;
7+
8+ #[ doc( inline) ]
9+ pub use std:: option:: Option ;
Original file line number Diff line number Diff line change 1919 {
2020 let stream = stream. into_stream ( ) ;
2121
22- Pin :: from ( Box :: new ( async move {
22+ Box :: pin ( async move {
2323 pin_utils:: pin_mut!( stream) ;
2424
2525 // Using `scan` here because it is able to stop the stream early
4343 Some ( err) => Err ( err) ,
4444 None => Ok ( out) ,
4545 }
46- } ) )
46+ } )
4747 }
4848}
Original file line number Diff line number Diff line change 11use std:: pin:: Pin ;
22
3- use crate :: prelude:: * ;
4- use crate :: stream:: { FromStream , IntoStream } ;
3+ use crate :: stream:: { Extend , FromStream , IntoStream } ;
54
65impl < T > FromStream < T > for Vec < T > {
76 #[ inline]
@@ -13,14 +12,12 @@ impl<T> FromStream<T> for Vec<T> {
1312 {
1413 let stream = stream. into_stream ( ) ;
1514
16- Pin :: from ( Box :: new ( async move {
15+ Box :: pin ( async move {
1716 pin_utils:: pin_mut!( stream) ;
1817
1918 let mut out = vec ! [ ] ;
20- while let Some ( item) = stream. next ( ) . await {
21- out. push ( item) ;
22- }
19+ out. stream_extend ( stream) . await ;
2320 out
24- } ) )
21+ } )
2522 }
2623}
You can’t perform that action at this time.
0 commit comments