File tree Expand file tree Collapse file tree 3 files changed +70
-40
lines changed Expand file tree Collapse file tree 3 files changed +70
-40
lines changed Original file line number Diff line number Diff line change @@ -17,26 +17,34 @@ where
1717 let stream = stream. into_stream ( ) ;
1818
1919 Box :: pin ( async move {
20- // Using `scan ` here because it is able to stop the stream early
20+ // Using `take_while ` here because it is able to stop the stream early
2121 // if a failure occurs
22+ let mut is_error = false ;
2223 let mut found_error = None ;
2324 let out: V = stream
24- . scan ( & mut found_error, |error, elem| {
25- match elem {
26- Ok ( elem) => Some ( elem) ,
27- Err ( err) => {
28- * * error = Some ( err) ;
29- // Stop processing the stream on error
30- None
31- }
25+ . take_while ( |elem| {
26+ // Stop processing the stream on `Err`
27+ !is_error
28+ && ( elem. is_ok ( ) || {
29+ is_error = true ;
30+ // Capture first `Err`
31+ true
32+ } )
33+ } )
34+ . filter_map ( |elem| match elem {
35+ Ok ( value) => Some ( value) ,
36+ Err ( err) => {
37+ found_error = Some ( err) ;
38+ None
3239 }
3340 } )
3441 . collect ( )
3542 . await ;
3643
37- match found_error {
38- Some ( err) => Err ( err) ,
39- None => Ok ( out) ,
44+ if is_error {
45+ Err ( found_error. unwrap ( ) )
46+ } else {
47+ Ok ( out)
4048 }
4149 } )
4250 }
Original file line number Diff line number Diff line change @@ -40,24 +40,35 @@ where
4040 S : Stream < Item = Result < U , E > > + ' a ,
4141 {
4242 Box :: pin ( async move {
43- // Using `scan ` here because it is able to stop the stream early
43+ // Using `take_while ` here because it is able to stop the stream early
4444 // if a failure occurs
45+ let mut is_error = false ;
4546 let mut found_error = None ;
46- let out = <T as Product < U > >:: product ( stream. scan ( & mut found_error, |error, elem| {
47- match elem {
48- Ok ( elem) => Some ( elem) ,
49- Err ( err) => {
50- * * error = Some ( err) ;
51- // Stop processing the stream on error
52- None
53- }
54- }
55- } ) )
47+ let out = <T as Product < U > >:: product (
48+ stream
49+ . take_while ( |elem| {
50+ // Stop processing the stream on `Err`
51+ !is_error
52+ && ( elem. is_ok ( ) || {
53+ is_error = true ;
54+ // Capture first `Err`
55+ true
56+ } )
57+ } )
58+ . filter_map ( |elem| match elem {
59+ Ok ( value) => Some ( value) ,
60+ Err ( err) => {
61+ found_error = Some ( err) ;
62+ None
63+ }
64+ } ) ,
65+ )
5666 . await ;
5767
58- match found_error {
59- Some ( err) => Err ( err) ,
60- None => Ok ( out) ,
68+ if is_error {
69+ Err ( found_error. unwrap ( ) )
70+ } else {
71+ Ok ( out)
6172 }
6273 } )
6374 }
Original file line number Diff line number Diff line change @@ -40,24 +40,35 @@ where
4040 S : Stream < Item = Result < U , E > > + ' a ,
4141 {
4242 Box :: pin ( async move {
43- // Using `scan ` here because it is able to stop the stream early
43+ // Using `take_while ` here because it is able to stop the stream early
4444 // if a failure occurs
45+ let mut is_error = false ;
4546 let mut found_error = None ;
46- let out = <T as Sum < U > >:: sum ( stream. scan ( & mut found_error, |error, elem| {
47- match elem {
48- Ok ( elem) => Some ( elem) ,
49- Err ( err) => {
50- * * error = Some ( err) ;
51- // Stop processing the stream on error
52- None
53- }
54- }
55- } ) )
47+ let out = <T as Sum < U > >:: sum (
48+ stream
49+ . take_while ( |elem| {
50+ // Stop processing the stream on `Err`
51+ !is_error
52+ && ( elem. is_ok ( ) || {
53+ is_error = true ;
54+ // Capture first `Err`
55+ true
56+ } )
57+ } )
58+ . filter_map ( |elem| match elem {
59+ Ok ( value) => Some ( value) ,
60+ Err ( err) => {
61+ found_error = Some ( err) ;
62+ None
63+ }
64+ } ) ,
65+ )
5666 . await ;
5767
58- match found_error {
59- Some ( err) => Err ( err) ,
60- None => Ok ( out) ,
68+ if is_error {
69+ Err ( found_error. unwrap ( ) )
70+ } else {
71+ Ok ( out)
6172 }
6273 } )
6374 }
You can’t perform that action at this time.
0 commit comments