11use std:: cmp:: Ordering ;
2- use std:: pin:: Pin ;
32use std:: future:: Future ;
3+ use std:: pin:: Pin ;
44
55use pin_project_lite:: pin_project;
66
@@ -13,7 +13,7 @@ pin_project! {
1313 pub struct MinByKeyFuture <S , T , K > {
1414 #[ pin]
1515 stream: S ,
16- min: Option <T >,
16+ min: Option <( T , T ) >,
1717 key_by: K ,
1818 }
1919}
@@ -37,24 +37,32 @@ where
3737 type Output = Option < S :: Item > ;
3838
3939 fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
40+ fn key < B , T > ( mut f : impl FnMut ( & T ) -> B ) -> impl FnMut ( T ) -> ( B , T ) {
41+ move |x| ( f ( & x) , x)
42+ }
43+
4044 let this = self . project ( ) ;
4145 let next = futures_core:: ready!( this. stream. poll_next( cx) ) ;
4246
4347 match next {
4448 Some ( new) => {
45- let new = ( this. key_by ) ( & new) ;
49+ let ( key , value ) = key ( this. key_by ) ( new) ;
4650 cx. waker ( ) . wake_by_ref ( ) ;
51+
4752 match this. min . take ( ) {
48- None => * this. min = Some ( new ) ,
53+ None => * this. min = Some ( ( key , value ) ) ,
4954
50- Some ( old) => match new . cmp ( & old) {
51- Ordering :: Less => * this. min = Some ( new ) ,
55+ Some ( old) => match key . cmp ( & old. 0 ) {
56+ Ordering :: Less => * this. min = Some ( ( key , value ) ) ,
5257 _ => * this. min = Some ( old) ,
5358 } ,
5459 }
5560 Poll :: Pending
5661 }
57- None => Poll :: Ready ( this. min . take ( ) ) ,
62+ None => Poll :: Ready ( match this. min . take ( ) {
63+ None => None ,
64+ Some ( max) => Some ( max. 1 ) ,
65+ } ) ,
5866 }
5967 }
6068}
0 commit comments