@@ -24,6 +24,16 @@ use crate::{
2424 Namespace ,
2525} ;
2626
27+ /// The result of one attempt to advance a cursor.
28+ pub ( super ) enum AdvanceResult {
29+ /// The cursor was successfully advanced and the buffer has at least one item.
30+ Advanced ,
31+ /// The cursor does not have any more items and will not return any more in the future.
32+ Exhausted ,
33+ /// The cursor does not currently have any items, but future calls to getMore may yield more.
34+ Waiting ,
35+ }
36+
2737/// An internal cursor that can be used in a variety of contexts depending on its `GetMoreProvider`.
2838#[ derive( Derivative ) ]
2939#[ derivative( Debug ) ]
@@ -95,33 +105,44 @@ where
95105 self . state . as_ref ( ) . unwrap ( )
96106 }
97107
98- /// Advance the cursor forward to the next document.
99- /// If there are no documents cached locally, perform getMores until
100- /// the cursor is exhausted or a result/error has been received .
108+ /// Attempt to advance the cursor forward to the next item. If there are no items cached
109+ /// locally, perform getMores until the cursor is exhausted or the buffer has been refilled.
110+ /// Return whether or not the cursor has been advanced .
101111 pub ( super ) async fn advance ( & mut self ) -> Result < bool > {
102112 loop {
103- self . state_mut ( ) . buffer . advance ( ) ;
104-
105- if ! self . state ( ) . buffer . is_empty ( ) {
106- break ;
113+ match self . try_advance ( ) . await ? {
114+ AdvanceResult :: Advanced => return Ok ( true ) ,
115+ AdvanceResult :: Exhausted => return Ok ( false ) ,
116+ AdvanceResult :: Waiting => continue ,
107117 }
118+ }
119+ }
108120
109- // if moving the offset puts us at the end of the buffer, perform another
110- // getMore if the cursor is still alive.
121+ /// Attempt to advance the cursor forward to the next item. If there are no items cached
122+ /// locally, perform a single getMore to attempt to retrieve more.
123+ pub ( super ) async fn try_advance ( & mut self ) -> Result < AdvanceResult > {
124+ if self . state_mut ( ) . buffer . advance ( ) {
125+ return Ok ( AdvanceResult :: Advanced ) ;
126+ } else if self . is_exhausted ( ) {
127+ return Ok ( AdvanceResult :: Exhausted ) ;
128+ }
111129
112- if self . state ( ) . exhausted {
113- return Ok ( false ) ;
114- }
130+ // If the buffer is empty but the cursor is not exhausted, perform a getMore.
131+ let client = self . client . clone ( ) ;
132+ let spec = self . info . clone ( ) ;
133+ let pin = self . state ( ) . pinned_connection . replicate ( ) ;
115134
116- let client = self . client . clone ( ) ;
117- let spec = self . info . clone ( ) ;
118- let pin = self . state ( ) . pinned_connection . replicate ( ) ;
135+ let result = self . provider . execute ( spec, client, pin) . await ;
136+ self . handle_get_more_result ( result) ?;
119137
120- let result = self . provider . execute ( spec, client, pin) . await ;
121- self . handle_get_more_result ( result) ?;
138+ if self . is_exhausted ( ) {
139+ Ok ( AdvanceResult :: Exhausted )
140+ } else {
141+ match self . state_mut ( ) . buffer . advance ( ) {
142+ true => Ok ( AdvanceResult :: Advanced ) ,
143+ false => Ok ( AdvanceResult :: Waiting ) ,
144+ }
122145 }
123-
124- Ok ( true )
125146 }
126147
127148 pub ( super ) fn take_state ( & mut self ) -> CursorState {
@@ -494,21 +515,27 @@ impl CursorBuffer {
494515 self . docs . is_empty ( )
495516 }
496517
518+ /// Removes and returns the document in the front of the buffer.
497519 pub ( crate ) fn next ( & mut self ) -> Option < RawDocumentBuf > {
498520 self . fresh = false ;
499521 self . docs . pop_front ( )
500522 }
501523
502- pub ( crate ) fn advance ( & mut self ) {
503- // if at the front of the buffer, don't move forward as the first document
504- // hasn't been consumed yet.
524+ /// Advances the buffer to the next document. Returns whether there are any documents remaining
525+ /// in the buffer after advancing.
526+ pub ( crate ) fn advance ( & mut self ) -> bool {
527+ // If at the front of the buffer, don't move forward as the first document hasn't been
528+ // consumed yet.
505529 if self . fresh {
506530 self . fresh = false ;
507- return ;
531+ } else {
532+ self . docs . pop_front ( ) ;
508533 }
509- self . next ( ) ;
534+ ! self . is_empty ( )
510535 }
511536
537+ /// Returns the item at the front of the buffer, if there is one. This method does not change
538+ /// the state of the buffer.
512539 pub ( crate ) fn current ( & self ) -> Option < & RawDocument > {
513540 self . docs . front ( ) . map ( |d| d. as_ref ( ) )
514541 }
@@ -519,3 +546,24 @@ impl AsRef<VecDeque<RawDocumentBuf>> for CursorBuffer {
519546 & self . docs
520547 }
521548}
549+
550+ #[ test]
551+ fn test_buffer ( ) {
552+ use bson:: rawdoc;
553+
554+ let queue: VecDeque < RawDocumentBuf > =
555+ [ rawdoc ! { "x" : 1 } , rawdoc ! { "x" : 2 } , rawdoc ! { "x" : 3 } ] . into ( ) ;
556+ let mut buffer = CursorBuffer :: new ( queue) ;
557+
558+ assert ! ( buffer. advance( ) ) ;
559+ assert_eq ! ( buffer. current( ) , Some ( rawdoc! { "x" : 1 } . as_ref( ) ) ) ;
560+
561+ assert ! ( buffer. advance( ) ) ;
562+ assert_eq ! ( buffer. current( ) , Some ( rawdoc! { "x" : 2 } . as_ref( ) ) ) ;
563+
564+ assert ! ( buffer. advance( ) ) ;
565+ assert_eq ! ( buffer. current( ) , Some ( rawdoc! { "x" : 3 } . as_ref( ) ) ) ;
566+
567+ assert ! ( !buffer. advance( ) ) ;
568+ assert_eq ! ( buffer. current( ) , None ) ;
569+ }
0 commit comments