@@ -15,7 +15,7 @@ use futures_core::stream::TryStream;
1515#[ cfg( feature = "alloc" ) ]
1616use futures_core:: stream:: { BoxStream , LocalBoxStream } ;
1717use futures_core:: {
18- future:: Future ,
18+ future:: { Future , TryFuture } ,
1919 stream:: { FusedStream , Stream } ,
2020 task:: { Context , Poll } ,
2121} ;
@@ -149,6 +149,14 @@ mod then;
149149#[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
150150pub use self :: then:: Then ;
151151
152+ mod try_for_each;
153+ #[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
154+ pub use self :: try_for_each:: TryForEach ;
155+
156+ mod try_fold;
157+ #[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
158+ pub use self :: try_fold:: TryFold ;
159+
152160mod zip;
153161#[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
154162pub use self :: zip:: Zip ;
@@ -197,6 +205,12 @@ cfg_target_has_atomic! {
197205 #[ cfg( feature = "alloc" ) ]
198206 #[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
199207 pub use self :: split:: { SplitStream , SplitSink , ReuniteError } ;
208+
209+ #[ cfg( feature = "alloc" ) ]
210+ mod try_for_each_concurrent;
211+ #[ cfg( feature = "alloc" ) ]
212+ #[ allow( unreachable_pub) ] // https://github.com/rust-lang/rust/issues/57411
213+ pub use self :: try_for_each_concurrent:: TryForEachConcurrent ;
200214}
201215
202216#[ cfg( feature = "std" ) ]
@@ -934,6 +948,143 @@ pub trait StreamExt: Stream {
934948 assert_future :: < ( ) , _ > ( ForEachConcurrent :: new ( self , limit. into ( ) , f) )
935949 }
936950
951+ /// Attempt to execute an accumulating asynchronous computation over a
952+ /// stream, collecting all the values into one final result.
953+ ///
954+ /// This combinator will accumulate all values returned by this stream
955+ /// according to the closure provided. The initial state is also provided to
956+ /// this method and then is returned again by each execution of the closure.
957+ /// Once the entire stream has been exhausted the returned future will
958+ /// resolve to this value.
959+ ///
960+ /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but
961+ /// will exit early if an error is encountered in the provided closure.
962+ ///
963+ /// # Examples
964+ ///
965+ /// ```
966+ /// # futures::executor::block_on(async {
967+ /// use futures::stream::{self, StreamExt};
968+ ///
969+ /// let number_stream = stream::iter(vec![1, 2]);
970+ /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok::<i32, i32>(acc + x) });
971+ /// assert_eq!(sum.await, Ok(3));
972+ ///
973+ /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
974+ /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x?) });
975+ /// assert_eq!(sum.await, Err(2));
976+ /// # })
977+ /// ```
978+ fn try_fold < T , Fut , F > ( self , init : T , f : F ) -> TryFold < Self , Fut , T , F >
979+ where
980+ F : FnMut ( T , Self :: Item ) -> Fut ,
981+ Fut : TryFuture < Ok = T > ,
982+ Self : Sized ,
983+ {
984+ assert_future :: < Result < T , Fut :: Error > , _ > ( TryFold :: new ( self , f, init) )
985+ }
986+
987+ /// Attempts to run this stream to completion, executing the provided
988+ /// asynchronous closure for each element on the stream.
989+ ///
990+ /// The provided closure will be called for each item this stream produces,
991+ /// yielding a future. That future will then be executed to completion
992+ /// before moving on to the next item.
993+ ///
994+ /// The returned value is a [`Future`](futures_core::future::Future) where
995+ /// the [`Output`](futures_core::future::Future::Output) type is
996+ /// `Result<(), Fut::Error>`. If any of the intermediate futures returns
997+ /// an error, this future will return immediately with an error.
998+ ///
999+ /// # Examples
1000+ ///
1001+ /// ```
1002+ /// # futures::executor::block_on(async {
1003+ /// use futures::future;
1004+ /// use futures::stream::{self, StreamExt};
1005+ ///
1006+ /// let mut x = 0i32;
1007+ ///
1008+ /// {
1009+ /// let fut = stream::repeat(1).try_for_each(|item| {
1010+ /// x += item;
1011+ /// future::ready(if x == 3 { Err(()) } else { Ok(()) })
1012+ /// });
1013+ /// assert_eq!(fut.await, Err(()));
1014+ /// }
1015+ ///
1016+ /// assert_eq!(x, 3);
1017+ /// # })
1018+ /// ```
1019+ fn try_for_each < Fut , F > ( self , f : F ) -> TryForEach < Self , Fut , F >
1020+ where
1021+ F : FnMut ( Self :: Item ) -> Fut ,
1022+ Fut : TryFuture < Ok = ( ) > ,
1023+ Self : Sized ,
1024+ {
1025+ assert_future :: < Result < ( ) , Fut :: Error > , _ > ( TryForEach :: new ( self , f) )
1026+ }
1027+
1028+ /// Attempts to run this stream to completion, executing the provided asynchronous
1029+ /// closure for each element on the stream concurrently as elements become
1030+ /// available, exiting as soon as an error occurs.
1031+ ///
1032+ /// This is similar to
1033+ /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
1034+ /// but will resolve to an error immediately if the provided closure returns
1035+ /// an error.
1036+ ///
1037+ /// This method is only available when the `std` or `alloc` feature of this
1038+ /// library is activated, and it is activated by default.
1039+ ///
1040+ /// # Examples
1041+ ///
1042+ /// ```
1043+ /// # futures::executor::block_on(async {
1044+ /// use futures::channel::oneshot;
1045+ /// use futures::stream::{self, StreamExt};
1046+ ///
1047+ /// let (tx1, rx1) = oneshot::channel();
1048+ /// let (tx2, rx2) = oneshot::channel();
1049+ /// let (_tx3, rx3) = oneshot::channel();
1050+ ///
1051+ /// let stream = stream::iter(vec![rx1, rx2, rx3]);
1052+ /// let fut = stream.try_for_each_concurrent(
1053+ /// /* limit */ 2,
1054+ /// |rx| async move {
1055+ /// let res: Result<(), oneshot::Canceled> = rx.await;
1056+ /// res
1057+ /// }
1058+ /// );
1059+ ///
1060+ /// tx1.send(()).unwrap();
1061+ /// // Drop the second sender so that `rx2` resolves to `Canceled`.
1062+ /// drop(tx2);
1063+ ///
1064+ /// // The final result is an error because the second future
1065+ /// // resulted in an error.
1066+ /// assert_eq!(Err(oneshot::Canceled), fut.await);
1067+ /// # })
1068+ /// ```
1069+ #[ cfg_attr( feature = "cfg-target-has-atomic" , cfg( target_has_atomic = "ptr" ) ) ]
1070+ #[ cfg( feature = "alloc" ) ]
1071+ fn try_for_each_concurrent < Fut , F , E > (
1072+ self ,
1073+ limit : impl Into < Option < usize > > ,
1074+ f : F ,
1075+ ) -> TryForEachConcurrent < Self , Fut , F >
1076+ where
1077+ F : FnMut ( Self :: Item ) -> Fut ,
1078+ Fut : Future < Output = Result < ( ) , E > > ,
1079+ Self : Sized ,
1080+ {
1081+ assert_future :: < Result < ( ) , E > , _ > ( TryForEachConcurrent :: new (
1082+ self ,
1083+ limit. into ( ) ,
1084+ f,
1085+ ) )
1086+ }
1087+
9371088 /// Creates a new stream of at most `n` items of the underlying stream.
9381089 ///
9391090 /// Once `n` items have been yielded from this stream then it will always
0 commit comments