11import { strict as assert } from 'assert' ;
2- import { UUID } from 'bson' ;
2+ import { Long , UUID } from 'bson' ;
33import { expect } from 'chai' ;
44import { on , once } from 'events' ;
55import { gte , lt } from 'semver' ;
@@ -11,19 +11,16 @@ import {
1111 type ChangeStream ,
1212 type ChangeStreamDocument ,
1313 type ChangeStreamOptions ,
14- type Collection ,
15- type CommandStartedEvent ,
16- type Db ,
17- isHello ,
18- LEGACY_HELLO_COMMAND ,
19- Long ,
20- MongoAPIError ,
21- MongoChangeStreamError ,
22- type MongoClient ,
23- MongoServerError ,
24- ReadPreference ,
2514 type ResumeToken
26- } from '../../mongodb' ;
15+ } from '../../../src/change_stream' ;
16+ import { type CommandStartedEvent } from '../../../src/cmap/command_monitoring_events' ;
17+ import { type Collection } from '../../../src/collection' ;
18+ import { LEGACY_HELLO_COMMAND } from '../../../src/constants' ;
19+ import { type Db } from '../../../src/db' ;
20+ import { MongoAPIError , MongoChangeStreamError , MongoServerError } from '../../../src/error' ;
21+ import { type MongoClient } from '../../../src/mongo_client' ;
22+ import { ReadPreference } from '../../../src/read_preference' ;
23+ import { isHello } from '../../../src/utils' ;
2724import * as mock from '../../tools/mongodb-mock/index' ;
2825import { TestBuilder , UnifiedTestSuiteBuilder } from '../../tools/unified_suite_builder' ;
2926import { type FailCommandFailPoint , sleep } from '../../tools/utils' ;
@@ -323,30 +320,25 @@ describe('Change Streams', function () {
323320 it ( 'should properly close ChangeStream cursor' , {
324321 metadata : { requires : { topology : 'replicaset' } } ,
325322
326- test : function ( done ) {
323+ test : async function ( ) {
327324 const configuration = this . configuration ;
328325 const client = configuration . newClient ( ) ;
329326
330- client . connect ( ( err , client ) => {
331- expect ( err ) . to . not . exist ;
332- this . defer ( ( ) => client . close ( ) ) ;
327+ await client . connect ( ) ;
328+ const database = client . db ( 'integration_tests' ) ;
329+ const changeStream = database . collection ( 'changeStreamCloseTest' ) . watch ( pipeline ) ;
333330
334- const database = client . db ( 'integration_tests' ) ;
335- const changeStream = database . collection ( 'changeStreamCloseTest' ) . watch ( pipeline ) ;
336- this . defer ( ( ) => changeStream . close ( ) ) ;
331+ assert . equal ( changeStream . closed , false ) ;
332+ assert . equal ( changeStream . cursor . closed , false ) ;
337333
338- assert . equal ( changeStream . closed , false ) ;
339- assert . equal ( changeStream . cursor . closed , false ) ;
334+ await changeStream . close ( ) ;
340335
341- changeStream . close ( err => {
342- expect ( err ) . to . not . exist ;
336+ // Check the cursor is closed
337+ expect ( changeStream . closed ) . to . be . true ;
338+ expect ( changeStream . cursor ) . property ( 'closed' , true ) ;
343339
344- // Check the cursor is closed
345- expect ( changeStream . closed ) . to . be . true ;
346- expect ( changeStream . cursor ) . property ( 'closed' , true ) ;
347- done ( ) ;
348- } ) ;
349- } ) ;
340+ await changeStream . close ( ) ;
341+ await client . close ( ) ;
350342 }
351343 } ) ;
352344
@@ -355,32 +347,28 @@ describe('Change Streams', function () {
355347 {
356348 metadata : { requires : { topology : 'replicaset' } } ,
357349
358- test : function ( done ) {
350+ test : async function ( ) {
359351 const configuration = this . configuration ;
360352 const client = configuration . newClient ( ) ;
361353
362- client . connect ( ( err , client ) => {
363- expect ( err ) . to . not . exist ;
364- this . defer ( ( ) => client . close ( ) ) ;
354+ await client . connect ( ) ;
365355
366- const forbiddenStage = { } ;
367- const forbiddenStageName = '$alksdjfhlaskdfjh' ;
368- forbiddenStage [ forbiddenStageName ] = 2 ;
356+ const forbiddenStage = { } ;
357+ const forbiddenStageName = '$alksdjfhlaskdfjh' ;
358+ forbiddenStage [ forbiddenStageName ] = 2 ;
369359
370- const database = client . db ( 'integration_tests' ) ;
371- const changeStream = database . collection ( 'forbiddenStageTest' ) . watch ( [ forbiddenStage ] ) ;
372- this . defer ( ( ) => changeStream . close ( ) ) ;
360+ const database = client . db ( 'integration_tests' ) ;
361+ const changeStream = database . collection ( 'forbiddenStageTest' ) . watch ( [ forbiddenStage ] ) ;
373362
374- changeStream . next ( err => {
375- assert . ok ( err ) ;
376- assert . ok ( err . message ) ;
377- assert . ok (
378- err . message . indexOf ( `Unrecognized pipeline stage name: '${ forbiddenStageName } '` ) > - 1
379- ) ;
363+ const err = await changeStream . next ( ) . catch ( e => e ) ;
364+ assert . ok ( err ) ;
365+ assert . ok ( err . message ) ;
366+ assert . ok (
367+ err . message . indexOf ( `Unrecognized pipeline stage name: '${ forbiddenStageName } '` ) > - 1
368+ ) ;
380369
381- done ( ) ;
382- } ) ;
383- } ) ;
370+ await changeStream . close ( ) ;
371+ await client . close ( ) ;
384372 }
385373 }
386374 ) ;
@@ -459,37 +447,25 @@ describe('Change Streams', function () {
459447
460448 it ( 'should error if resume token projected out of change stream document using iterator' , {
461449 metadata : { requires : { topology : 'replicaset' } } ,
462- test ( done ) {
450+ async test ( ) {
463451 const configuration = this . configuration ;
464452 const client = configuration . newClient ( ) ;
465453
466- client . connect ( ( err , client ) => {
467- expect ( err ) . to . not . exist ;
454+ await client . connect ( ) ;
468455
469- const database = client . db ( 'integration_tests' ) ;
470- const collection = database . collection ( 'resumetokenProjectedOutCallback' ) ;
471- const changeStream = collection . watch ( [ { $project : { _id : false } } ] ) ;
456+ const database = client . db ( 'integration_tests' ) ;
457+ const collection = database . collection ( 'resumetokenProjectedOutCallback' ) ;
458+ const changeStream = collection . watch ( [ { $project : { _id : false } } ] ) ;
472459
473- changeStream . hasNext ( ( ) => {
474- // trigger initialize
475- } ) ;
460+ await initIteratorMode ( changeStream ) ;
476461
477- changeStream . cursor . on ( 'init' , ( ) => {
478- collection . insertOne ( { b : 2 } , ( err , res ) => {
479- expect ( err ) . to . be . undefined ;
480- expect ( res ) . to . exist ;
481-
482- changeStream . next ( err => {
483- expect ( err ) . to . exist ;
484- changeStream . close ( ( ) => {
485- client . close ( ( ) => {
486- done ( ) ;
487- } ) ;
488- } ) ;
489- } ) ;
490- } ) ;
491- } ) ;
492- } ) ;
462+ const res = await collection . insertOne ( { b : 2 } ) ;
463+ expect ( res ) . to . exist ;
464+
465+ const err = await changeStream . next ( ) . catch ( e => e ) ;
466+ expect ( err ) . to . exist ;
467+ await changeStream . close ( ) ;
468+ await client . close ( ) ;
493469 }
494470 } ) ;
495471
@@ -1291,7 +1267,7 @@ describe('Change Streams', function () {
12911267 await mock . cleanup ( ) ;
12921268 } ) ;
12931269
1294- it ( 'changeStream should close if cursor id for initial aggregate is Long.ZERO' , function ( done ) {
1270+ it ( 'changeStream should close if cursor id for initial aggregate is Long.ZERO' , async function ( ) {
12951271 mockServer . setMessageHandler ( req => {
12961272 const doc = req . document ;
12971273 if ( isHello ( doc ) ) {
@@ -1320,17 +1296,16 @@ describe('Change Streams', function () {
13201296 const client = this . configuration . newClient ( `mongodb://${ mockServer . uri ( ) } /` , {
13211297 serverApi : null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
13221298 } ) ;
1323- client . connect ( err => {
1324- expect ( err ) . to . not . exist ;
1325- const collection = client . db ( 'cs' ) . collection ( 'test' ) ;
1326- const changeStream = collection . watch ( ) ;
1327- changeStream . next ( ( err , doc ) => {
1328- expect ( err ) . to . exist ;
1329- expect ( doc ) . to . not . exist ;
1330- expect ( err ?. message ) . to . equal ( 'ChangeStream is closed' ) ;
1331- changeStream . close ( ( ) => client . close ( done ) ) ;
1332- } ) ;
1333- } ) ;
1299+ await client . connect ( ) ;
1300+ const collection = client . db ( 'cs' ) . collection ( 'test' ) ;
1301+ const changeStream = collection . watch ( ) ;
1302+
1303+ const err = await changeStream . next ( ) . catch ( e => e ) ;
1304+ expect ( err ) . to . exist ;
1305+ expect ( err ?. message ) . to . equal ( 'ChangeStream is closed' ) ;
1306+
1307+ await changeStream . close ( ) ;
1308+ await client . close ( ) ;
13341309 } ) ;
13351310 } ) ;
13361311 } ) ;
0 commit comments