@@ -220,6 +220,11 @@ export abstract class AbstractCursor<
220220 return this [ kId ] ?? undefined ;
221221 }
222222
223+ /** @internal */
224+ get isDead ( ) {
225+ return ( this [ kId ] ?. isZero ( ) ?? false ) || this [ kClosed ] || this [ kKilled ] ;
226+ }
227+
223228 /** @internal */
224229 get client ( ) : MongoClient {
225230 return this [ kClient ] ;
@@ -671,7 +676,7 @@ export abstract class AbstractCursor<
671676 return cleanupCursor ( this , { error } , ( ) => callback ( error , undefined ) ) ;
672677 }
673678
674- if ( cursorIsDead ( this ) ) {
679+ if ( this . isDead ) {
675680 return cleanupCursor ( this , undefined , ( ) => callback ( ) ) ;
676681 }
677682
@@ -701,96 +706,82 @@ async function next<T>(
701706 transform : boolean ;
702707 }
703708) : Promise < T | null > {
704- const cursorId = cursor [ kId ] ;
705709 if ( cursor . closed ) {
706710 return null ;
707711 }
708712
709- if ( cursor [ kDocuments ] . length !== 0 ) {
710- const doc = cursor [ kDocuments ] . shift ( ) ;
713+ do {
714+ if ( cursor [ kId ] == null ) {
715+ // All cursors must operate within a session, one must be made implicitly if not explicitly provided
716+ await promisify ( cursor [ kInit ] . bind ( cursor ) ) ( ) ;
717+ }
718+
719+ if ( cursor [ kDocuments ] . length !== 0 ) {
720+ const doc = cursor [ kDocuments ] . shift ( ) ;
711721
712- if ( doc != null && transform && cursor [ kTransform ] ) {
713- try {
714- return cursor [ kTransform ] ( doc ) ;
715- } catch ( error ) {
716- await cleanupCursorAsync ( cursor , { error, needsToEmitClosed : true } ) . catch ( ( ) => {
722+ if ( doc != null && transform && cursor [ kTransform ] ) {
723+ try {
724+ return cursor [ kTransform ] ( doc ) ;
725+ } catch ( error ) {
717726 // `cleanupCursorAsync` should never throw, but if it does we want to throw the original
718727 // error instead.
719- } ) ;
720- throw error ;
728+ await cleanupCursorAsync ( cursor , { error, needsToEmitClosed : true } ) . catch ( ( ) => null ) ;
729+ throw error ;
730+ }
721731 }
722- }
723732
724- return doc ;
725- }
733+ return doc ;
734+ }
726735
727- if ( cursorId == null ) {
728- // All cursors must operate within a session, one must be made implicitly if not explicitly provided
729- const init = promisify ( cb => cursor [ kInit ] ( cb ) ) ;
730- await init ( ) ;
731- return next ( cursor , { blocking, transform } ) ;
732- }
736+ if ( cursor . isDead ) {
737+ // if the cursor is dead, we clean it up
738+ // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
739+ // and we should surface the error
740+ await cleanupCursorAsync ( cursor , { } ) ;
741+ return null ;
742+ }
733743
734- if ( cursorIsDead ( cursor ) ) {
735- // if the cursor is dead, we clean it up
736- // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
737- // and we should surface the error
738- await cleanupCursorAsync ( cursor , { } ) ;
739- return null ;
740- }
744+ // otherwise need to call getMore
745+ const batchSize = cursor [ kOptions ] . batchSize || 1000 ;
741746
742- // otherwise need to call getMore
743- const batchSize = cursor [ kOptions ] . batchSize || 1000 ;
744- const getMore = promisify ( ( batchSize : number , cb : Callback < Document | undefined > ) =>
745- cursor . _getMore ( batchSize , cb )
746- ) ;
747-
748- let response : Document | undefined ;
749- try {
750- response = await getMore ( batchSize ) ;
751- } catch ( error ) {
752- if ( error ) {
753- await cleanupCursorAsync ( cursor , { error } ) . catch ( ( ) => {
754- // `cleanupCursorAsync` should never throw, but if it does we want to throw the original
755- // error instead.
756- } ) ;
747+ try {
748+ const response = await promisify ( cursor . _getMore . bind ( cursor ) ) ( batchSize ) ;
749+
750+ if ( response ) {
751+ const cursorId =
752+ typeof response . cursor . id === 'number'
753+ ? Long . fromNumber ( response . cursor . id )
754+ : typeof response . cursor . id === 'bigint'
755+ ? Long . fromBigInt ( response . cursor . id )
756+ : response . cursor . id ;
757+
758+ cursor [ kDocuments ] . pushMany ( response . cursor . nextBatch ) ;
759+ cursor [ kId ] = cursorId ;
760+ }
761+ } catch ( error ) {
762+ // `cleanupCursorAsync` should never throw, but if it does we want to throw the original
763+ // error instead.
764+ await cleanupCursorAsync ( cursor , { error } ) . catch ( ( ) => null ) ;
757765 throw error ;
758766 }
759- }
760-
761- if ( response ) {
762- const cursorId =
763- typeof response . cursor . id === 'number'
764- ? Long . fromNumber ( response . cursor . id )
765- : typeof response . cursor . id === 'bigint'
766- ? Long . fromBigInt ( response . cursor . id )
767- : response . cursor . id ;
768-
769- cursor [ kDocuments ] . pushMany ( response . cursor . nextBatch ) ;
770- cursor [ kId ] = cursorId ;
771- }
772-
773- if ( cursorIsDead ( cursor ) ) {
774- // If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
775- // we intentionally clean up the cursor to release its session back into the pool before the cursor
776- // is iterated. This prevents a cursor that is exhausted on the server from holding
777- // onto a session indefinitely until the AbstractCursor is iterated.
778- //
779- // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
780- // and we should surface the error
781- await cleanupCursorAsync ( cursor , { } ) ;
782- }
783767
784- if ( cursor [ kDocuments ] . length === 0 && blocking === false ) {
785- return null ;
786- }
768+ if ( cursor . isDead ) {
769+ // If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
770+ // we intentionally clean up the cursor to release its session back into the pool before the cursor
771+ // is iterated. This prevents a cursor that is exhausted on the server from holding
772+ // onto a session indefinitely until the AbstractCursor is iterated.
773+ //
774+ // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
775+ // and we should surface the error
776+ await cleanupCursorAsync ( cursor , { } ) ;
777+ }
787778
788- return next ( cursor , { blocking, transform } ) ;
789- }
779+ if ( cursor [ kDocuments ] . length === 0 && blocking === false ) {
780+ return null ;
781+ }
782+ } while ( ! cursor . isDead || cursor [ kDocuments ] . length !== 0 ) ;
790783
791- function cursorIsDead ( cursor : AbstractCursor ) : boolean {
792- const cursorId = cursor [ kId ] ;
793- return ! ! cursorId && cursorId . isZero ( ) ;
784+ return null ;
794785}
795786
796787const cleanupCursorAsync = promisify ( cleanupCursor ) ;
0 commit comments