@@ -18,11 +18,11 @@ pub struct PartialCmpFuture<L: Stream, R: Stream> {
1818 r_cache : Option < R :: Item > ,
1919}
2020
21- impl < L : Stream + Unpin , R : Stream + Unpin > Unpin for PartialCmpFuture < L , R > { }
22-
2321impl < L : Stream , R : Stream > PartialCmpFuture < L , R > {
2422 pin_utils:: unsafe_pinned!( l: Fuse <L >) ;
2523 pin_utils:: unsafe_pinned!( r: Fuse <R >) ;
24+ pin_utils:: unsafe_unpinned!( l_cache: Option <L :: Item >) ;
25+ pin_utils:: unsafe_unpinned!( r_cache: Option <R :: Item >) ;
2626
2727 pub ( super ) fn new ( l : L , r : R ) -> Self {
2828 PartialCmpFuture {
@@ -36,59 +36,57 @@ impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
3636
3737impl < L : Stream , R : Stream > Future for PartialCmpFuture < L , R >
3838where
39- L : Stream + Unpin + Sized ,
40- R : Stream + Unpin + Sized ,
39+ L : Stream + Sized ,
40+ R : Stream + Sized ,
4141 L :: Item : PartialOrd < R :: Item >
4242{
4343 type Output = Option < Ordering > ;
4444
4545 fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
46- // Short circuit logic
47- // Stream that completes earliest can be considered Less, etc
48- let l_complete = self . l . done && self . as_mut ( ) . l_cache . is_none ( ) ;
49- let r_complete = self . r . done && self . as_mut ( ) . r_cache . is_none ( ) ;
46+ loop {
47+ // Short circuit logic
48+ // Stream that completes earliest can be considered Less, etc
49+ let l_complete = self . l . done && self . as_mut ( ) . l_cache . is_none ( ) ;
50+ let r_complete = self . r . done && self . as_mut ( ) . r_cache . is_none ( ) ;
5051
51- if l_complete && r_complete {
52- return Poll :: Ready ( Some ( Ordering :: Equal ) )
53- } else if l_complete {
54- return Poll :: Ready ( Some ( Ordering :: Less ) )
55- } else if r_complete {
56- return Poll :: Ready ( Some ( Ordering :: Greater ) )
57- }
52+ if l_complete && r_complete {
53+ return Poll :: Ready ( Some ( Ordering :: Equal ) )
54+ } else if l_complete {
55+ return Poll :: Ready ( Some ( Ordering :: Less ) )
56+ } else if r_complete {
57+ return Poll :: Ready ( Some ( Ordering :: Greater ) )
58+ }
5859
59- // Get next value if possible and necesary
60- if !self . l . done && self . as_mut ( ) . l_cache . is_none ( ) {
61- let l_next = futures_core:: ready!( self . as_mut( ) . l( ) . poll_next( cx) ) ;
62- if let Some ( item) = l_next {
63- self . as_mut ( ) . l_cache = Some ( item) ;
60+ // Get next value if possible and necesary
61+ if !self . l . done && self . as_mut ( ) . l_cache . is_none ( ) {
62+ let l_next = futures_core:: ready!( self . as_mut( ) . l( ) . poll_next( cx) ) ;
63+ if let Some ( item) = l_next {
64+ * self . as_mut ( ) . l_cache ( ) = Some ( item) ;
65+ }
6466 }
65- }
6667
67- if !self . r . done && self . as_mut ( ) . r_cache . is_none ( ) {
68- let r_next = futures_core:: ready!( self . as_mut( ) . r( ) . poll_next( cx) ) ;
69- if let Some ( item) = r_next {
70- self . as_mut ( ) . r_cache = Some ( item) ;
68+ if !self . r . done && self . as_mut ( ) . r_cache . is_none ( ) {
69+ let r_next = futures_core:: ready!( self . as_mut( ) . r( ) . poll_next( cx) ) ;
70+ if let Some ( item) = r_next {
71+ * self . as_mut ( ) . r_cache ( ) = Some ( item) ;
72+ }
7173 }
72- }
7374
74- // Compare if both values are available.
75- if self . as_mut ( ) . l_cache . is_some ( ) && self . as_mut ( ) . r_cache . is_some ( ) {
76- let l_value = self . as_mut ( ) . l_cache . take ( ) . unwrap ( ) ;
77- let r_value = self . as_mut ( ) . r_cache . take ( ) . unwrap ( ) ;
78- let result = l_value. partial_cmp ( & r_value) ;
75+ // Compare if both values are available.
76+ if self . as_mut ( ) . l_cache . is_some ( ) && self . as_mut ( ) . r_cache . is_some ( ) {
77+ let l_value = self . as_mut ( ) . l_cache ( ) . take ( ) . unwrap ( ) ;
78+ let r_value = self . as_mut ( ) . r_cache ( ) . take ( ) . unwrap ( ) ;
79+ let result = l_value. partial_cmp ( & r_value) ;
7980
80- if let Some ( Ordering :: Equal ) = result {
81- // Reset cache to prepare for next comparison
82- self . as_mut ( ) . l_cache = None ;
83- self . as_mut ( ) . r_cache = None ;
84- } else {
85- // Return non equal value
86- return Poll :: Ready ( result) ;
87- }
81+ if let Some ( Ordering :: Equal ) = result {
82+ // Reset cache to prepare for next comparison
83+ * self . as_mut ( ) . l_cache ( ) = None ;
84+ * self . as_mut ( ) . r_cache ( ) = None ;
85+ } else {
86+ // Return non equal value
87+ return Poll :: Ready ( result) ;
88+ }
89+ }
8890 }
89-
90- // wakes task to pull the next item from stream
91- cx. waker ( ) . wake_by_ref ( ) ;
92- Poll :: Pending
9391 }
94- }
92+ }
0 commit comments