1+ use std:: cmp:: Ordering ;
2+ use std:: pin:: Pin ;
3+
4+ use super :: fuse:: Fuse ;
5+ use crate :: future:: Future ;
6+ use crate :: prelude:: * ;
7+ use crate :: stream:: Stream ;
8+ use crate :: task:: { Context , Poll } ;
9+
10+ // Lexicographically compares the elements of this `Stream` with those
11+ // of another.
12+ #[ doc( hidden) ]
13+ #[ allow( missing_debug_implementations) ]
14+ pub struct PartialCmpFuture < L : Stream , R : Stream > {
15+ l : Fuse < L > ,
16+ r : Fuse < R > ,
17+ l_cache : Option < L :: Item > ,
18+ r_cache : Option < R :: Item > ,
19+ }
20+
21+ impl < L : Stream , R : Stream > PartialCmpFuture < L , R > {
22+ pin_utils:: unsafe_pinned!( l: Fuse <L >) ;
23+ pin_utils:: unsafe_pinned!( r: Fuse <R >) ;
24+ pin_utils:: unsafe_unpinned!( l_cache: Option <L :: Item >) ;
25+ pin_utils:: unsafe_unpinned!( r_cache: Option <R :: Item >) ;
26+
27+ pub ( super ) fn new ( l : L , r : R ) -> Self {
28+ PartialCmpFuture {
29+ l : l. fuse ( ) ,
30+ r : r. fuse ( ) ,
31+ l_cache : None ,
32+ r_cache : None ,
33+ }
34+ }
35+ }
36+
37+ impl < L : Stream , R : Stream > Future for PartialCmpFuture < L , R >
38+ where
39+ L : Stream + Sized ,
40+ R : Stream + Sized ,
41+ L :: Item : PartialOrd < R :: Item > ,
42+ {
43+ type Output = Option < Ordering > ;
44+
45+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
46+ loop {
47+ // Short circuit logic
48+ // Stream that completes earliest can be considered Less, etc
49+ let l_complete = self . l . done && self . as_mut ( ) . l_cache . is_none ( ) ;
50+ let r_complete = self . r . done && self . as_mut ( ) . r_cache . is_none ( ) ;
51+
52+ if l_complete && r_complete {
53+ return Poll :: Ready ( Some ( Ordering :: Equal ) ) ;
54+ } else if l_complete {
55+ return Poll :: Ready ( Some ( Ordering :: Less ) ) ;
56+ } else if r_complete {
57+ return Poll :: Ready ( Some ( Ordering :: Greater ) ) ;
58+ }
59+
60+ // Get next value if possible and necesary
61+ if !self . l . done && self . as_mut ( ) . l_cache . is_none ( ) {
62+ let l_next = futures_core:: ready!( self . as_mut( ) . l( ) . poll_next( cx) ) ;
63+ if let Some ( item) = l_next {
64+ * self . as_mut ( ) . l_cache ( ) = Some ( item) ;
65+ }
66+ }
67+
68+ if !self . r . done && self . as_mut ( ) . r_cache . is_none ( ) {
69+ let r_next = futures_core:: ready!( self . as_mut( ) . r( ) . poll_next( cx) ) ;
70+ if let Some ( item) = r_next {
71+ * self . as_mut ( ) . r_cache ( ) = Some ( item) ;
72+ }
73+ }
74+
75+ // Compare if both values are available.
76+ if self . as_mut ( ) . l_cache . is_some ( ) && self . as_mut ( ) . r_cache . is_some ( ) {
77+ let l_value = self . as_mut ( ) . l_cache ( ) . take ( ) . unwrap ( ) ;
78+ let r_value = self . as_mut ( ) . r_cache ( ) . take ( ) . unwrap ( ) ;
79+ let result = l_value. partial_cmp ( & r_value) ;
80+
81+ if let Some ( Ordering :: Equal ) = result {
82+ // Reset cache to prepare for next comparison
83+ * self . as_mut ( ) . l_cache ( ) = None ;
84+ * self . as_mut ( ) . r_cache ( ) = None ;
85+ } else {
86+ // Return non equal value
87+ return Poll :: Ready ( result) ;
88+ }
89+ }
90+ }
91+ }
92+ }
0 commit comments