@@ -123,11 +123,13 @@ cfg_unstable! {
123123 pub use flatten:: Flatten ;
124124 pub use flat_map:: FlatMap ;
125125 pub use timeout:: { TimeoutError , Timeout } ;
126+ pub use throttle:: Throttle ;
126127
127128 mod merge;
128129 mod flatten;
129130 mod flat_map;
130131 mod timeout;
132+ mod throttle;
131133}
132134
133135extension_trait ! {
@@ -313,6 +315,56 @@ extension_trait! {
313315 TakeWhile :: new( self , predicate)
314316 }
315317
318+ #[ doc = r#"
319+ Limit the amount of items yielded per timeslice in a stream.
320+
321+ This stream does not drop any items, but will only limit the rate at which items pass through.
322+ # Examples
323+ ```
324+ # fn main() { async_std::task::block_on(async {
325+ #
326+ use async_std::prelude::*;
327+ use async_std::stream;
328+ use std::time::{Duration, Instant};
329+
330+ let start = Instant::now();
331+
332+ // emit value every 5 milliseconds
333+ let s = stream::interval(Duration::from_millis(5))
334+ .enumerate()
335+ .take(3);
336+
337+ // throttle for 10 milliseconds
338+ let mut s = s.throttle(Duration::from_millis(10));
339+
340+ assert_eq!(s.next().await, Some((0, ())));
341+ let duration_ms = start.elapsed().as_millis();
342+ assert!(duration_ms >= 5);
343+
344+ assert_eq!(s.next().await, Some((1, ())));
345+ let duration_ms = start.elapsed().as_millis();
346+ assert!(duration_ms >= 15);
347+
348+ assert_eq!(s.next().await, Some((2, ())));
349+ let duration_ms = start.elapsed().as_millis();
350+ assert!(duration_ms >= 25);
351+
352+ assert_eq!(s.next().await, None);
353+ let duration_ms = start.elapsed().as_millis();
354+ assert!(duration_ms >= 35);
355+ #
356+ # }) }
357+ ```
358+ "# ]
359+ #[ cfg( all( feature = "default" , feature = "unstable" ) ) ]
360+ #[ cfg_attr( feature = "docs" , doc( cfg( unstable) ) ) ]
361+ fn throttle( self , d: Duration ) -> Throttle <Self >
362+ where
363+ Self : Sized ,
364+ {
365+ Throttle :: new( self , d)
366+ }
367+
316368 #[ doc = r#"
317369 Creates a stream that yields each `step`th element.
318370
0 commit comments