File tree Expand file tree Collapse file tree 4 files changed +51
-4
lines changed Expand file tree Collapse file tree 4 files changed +51
-4
lines changed Original file line number Diff line number Diff line change @@ -3,12 +3,16 @@ pub mod event;
33pub ( crate ) mod options;
44pub mod session;
55
6+ #[ cfg( test) ]
7+ use std:: collections:: VecDeque ;
68use std:: {
79 future:: Future ,
810 pin:: Pin ,
911 task:: { Context , Poll } ,
1012} ;
1113
14+ #[ cfg( test) ]
15+ use bson:: RawDocumentBuf ;
1216use bson:: { Document , Timestamp } ;
1317use derivative:: Derivative ;
1418use futures_core:: { future:: BoxFuture , Stream } ;
@@ -167,6 +171,11 @@ where
167171 pub ( crate ) fn set_kill_watcher ( & mut self , tx : oneshot:: Sender < ( ) > ) {
168172 self . cursor . set_kill_watcher ( tx) ;
169173 }
174+
175+ #[ cfg( test) ]
176+ pub ( crate ) fn current_batch ( & self ) -> & VecDeque < RawDocumentBuf > {
177+ self . cursor . current_batch ( )
178+ }
170179}
171180
172181/// Arguments passed to a `watch` method, captured to allow resume.
Original file line number Diff line number Diff line change 8787 self . state ( ) . buffer . current ( )
8888 }
8989
90+ #[ cfg( test) ]
91+ pub ( super ) fn current_batch ( & self ) -> & VecDeque < RawDocumentBuf > {
92+ self . state ( ) . buffer . as_ref ( )
93+ }
94+
9095 fn state_mut ( & mut self ) -> & mut CursorState {
9196 self . state . as_mut ( ) . unwrap ( )
9297 }
@@ -531,3 +536,9 @@ impl CursorBuffer {
531536 self . docs . front ( ) . map ( |d| d. as_ref ( ) )
532537 }
533538}
539+
540+ impl AsRef < VecDeque < RawDocumentBuf > > for CursorBuffer {
541+ fn as_ref ( & self ) -> & VecDeque < RawDocumentBuf > {
542+ & self . docs
543+ }
544+ }
Original file line number Diff line number Diff line change 11mod common;
22pub ( crate ) mod session;
33
4+ #[ cfg( test) ]
5+ use std:: collections:: VecDeque ;
46use std:: {
57 pin:: Pin ,
68 task:: { Context , Poll } ,
79} ;
810
911use bson:: RawDocument ;
12+
13+ #[ cfg( test) ]
14+ use bson:: RawDocumentBuf ;
1015use futures_core:: { future:: BoxFuture , Stream } ;
1116use serde:: { de:: DeserializeOwned , Deserialize } ;
1217#[ cfg( test) ]
@@ -269,6 +274,11 @@ impl<T> Cursor<T> {
269274 ) ;
270275 self . kill_watcher = Some ( tx) ;
271276 }
277+
278+ #[ cfg( test) ]
279+ pub ( crate ) fn current_batch ( & self ) -> & VecDeque < RawDocumentBuf > {
280+ self . wrapped_cursor . as_ref ( ) . unwrap ( ) . current_batch ( )
281+ }
272282}
273283
274284impl < T > CursorStream for Cursor < T >
Original file line number Diff line number Diff line change @@ -455,11 +455,28 @@ async fn batch_mid_resume_token() -> Result<()> {
455455 None => return Ok ( ( ) ) ,
456456 } ;
457457
458- coll. insert_many ( ( 0 ..2 ) . map ( |i| doc ! { "_id" : i as i32 } ) , None )
459- . await ?;
458+ // This loop gets the stream to a point where it has been iterated up to but not including
459+ // the last event in its batch.
460+ let mut event_id = None ;
461+ loop {
462+ match stream. next_if_any ( ) . await ? {
463+ Some ( event) => {
464+ event_id = Some ( event. id ) ;
465+ }
466+ // If we're out of events, make some more.
467+ None => {
468+ coll. insert_many ( ( 0 ..3 ) . map ( |_| doc ! { } ) , None ) . await ?;
469+ }
470+ } ;
471+
472+ // if after iterating the stream last time there's one document left,
473+ // then we're done here and can continue to the assertions.
474+ if stream. current_batch ( ) . len ( ) == 1 {
475+ break ;
476+ }
477+ }
460478
461- let mid_id = stream. next ( ) . await . transpose ( ) ?. unwrap ( ) . id ;
462- assert_eq ! ( stream. resume_token( ) , Some ( mid_id) ) ;
479+ assert_eq ! ( stream. resume_token( ) . unwrap( ) , event_id. unwrap( ) ) ;
463480
464481 Ok ( ( ) )
465482}
You can’t perform that action at this time.
0 commit comments