@@ -17,8 +17,7 @@ import {
1717} from '../constants' ;
1818import {
1919 type AnyError ,
20- MONGODB_ERROR_CODES ,
21- MongoError ,
20+ type MongoError ,
2221 MongoInvalidArgumentError ,
2322 MongoMissingCredentialsError ,
2423 MongoNetworkError ,
@@ -27,7 +26,14 @@ import {
2726} from '../error' ;
2827import { CancellationToken , TypedEventEmitter } from '../mongo_types' ;
2928import type { Server } from '../sdam/server' ;
30- import { type Callback , eachAsync , List , makeCounter , TimeoutController } from '../utils' ;
29+ import {
30+ type Callback ,
31+ eachAsync ,
32+ List ,
33+ makeCounter ,
34+ promiseWithResolvers ,
35+ TimeoutController
36+ } from '../utils' ;
3137import { connect } from './connect' ;
3238import { Connection , type ConnectionEvents , type ConnectionOptions } from './connection' ;
3339import {
@@ -100,7 +106,8 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
100106
101107/** @internal */
102108export interface WaitQueueMember {
103- callback : Callback < Connection > ;
109+ resolve : ( conn : Connection ) => void ;
110+ reject : ( err : AnyError ) => void ;
104111 timeoutController : TimeoutController ;
105112 [ kCancelled ] ?: boolean ;
106113}
@@ -350,16 +357,18 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
350357 * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
351358 * explicitly destroyed by the new owner.
352359 */
353- checkOut ( callback : Callback < Connection > ) : void {
360+ async checkOut ( ) : Promise < Connection > {
354361 this . emitAndLog (
355362 ConnectionPool . CONNECTION_CHECK_OUT_STARTED ,
356363 new ConnectionCheckOutStartedEvent ( this )
357364 ) ;
358365
359366 const waitQueueTimeoutMS = this . options . waitQueueTimeoutMS ;
360367
368+ const { promise, resolve, reject } = promiseWithResolvers < Connection > ( ) ;
361369 const waitQueueMember : WaitQueueMember = {
362- callback,
370+ resolve,
371+ reject,
363372 timeoutController : new TimeoutController ( waitQueueTimeoutMS )
364373 } ;
365374 waitQueueMember . timeoutController . signal . addEventListener ( 'abort' , ( ) => {
@@ -370,7 +379,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
370379 ConnectionPool . CONNECTION_CHECK_OUT_FAILED ,
371380 new ConnectionCheckOutFailedEvent ( this , 'timeout' )
372381 ) ;
373- waitQueueMember . callback (
382+ waitQueueMember . reject (
374383 new WaitQueueTimeoutError (
375384 this . loadBalanced
376385 ? this . waitQueueErrorMetrics ( )
@@ -382,6 +391,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
382391
383392 this [ kWaitQueue ] . push ( waitQueueMember ) ;
384393 process . nextTick ( ( ) => this . processWaitQueue ( ) ) ;
394+
395+ return promise ;
385396 }
386397
387398 /**
@@ -534,115 +545,35 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
534545 }
535546
536547 /**
537- * Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda
538- * has completed by calling back.
539- *
540- * NOTE: please note the required signature of `fn`
541- *
542- * @remarks When in load balancer mode, connections can be pinned to cursors or transactions.
543- * In these cases we pass the connection in to this method to ensure it is used and a new
544- * connection is not checked out.
545- *
546- * @param conn - A pinned connection for use in load balancing mode.
547- * @param fn - A function which operates on a managed connection
548- * @param callback - The original callback
549- */
550- withConnection (
551- conn : Connection | undefined ,
552- fn : WithConnectionCallback ,
553- callback : Callback < Connection >
554- ) : void {
555- if ( conn ) {
556- // use the provided connection, and do _not_ check it in after execution
557- fn ( undefined , conn , ( fnErr , result ) => {
558- if ( fnErr ) {
559- return this . withReauthentication ( fnErr , conn , fn , callback ) ;
560- }
561- callback ( undefined , result ) ;
562- } ) ;
563- return ;
564- }
565-
566- this . checkOut ( ( err , conn ) => {
567- // don't callback with `err` here, we might want to act upon it inside `fn`
568- fn ( err as MongoError , conn , ( fnErr , result ) => {
569- if ( fnErr ) {
570- if ( conn ) {
571- this . withReauthentication ( fnErr , conn , fn , callback ) ;
572- } else {
573- callback ( fnErr ) ;
574- }
575- } else {
576- callback ( undefined , result ) ;
577- }
578-
579- if ( conn ) {
580- this . checkIn ( conn ) ;
581- }
582- } ) ;
583- } ) ;
584- }
585-
586- private withReauthentication (
587- fnErr : AnyError ,
588- conn : Connection ,
589- fn : WithConnectionCallback ,
590- callback : Callback < Connection >
591- ) {
592- if ( fnErr instanceof MongoError && fnErr . code === MONGODB_ERROR_CODES . Reauthenticate ) {
593- this . reauthenticate ( conn , fn , ( error , res ) => {
594- if ( error ) {
595- return callback ( error ) ;
596- }
597- callback ( undefined , res ) ;
598- } ) ;
599- } else {
600- callback ( fnErr ) ;
601- }
602- }
603-
604- /**
605- * Reauthenticate on the same connection and then retry the operation.
548+ * @internal
549+ * Reauthenticate a connection
606550 */
607- private reauthenticate (
608- connection : Connection ,
609- fn : WithConnectionCallback ,
610- callback : Callback
611- ) : void {
551+ async reauthenticate ( connection : Connection ) : Promise < void > {
612552 const authContext = connection . authContext ;
613553 if ( ! authContext ) {
614- return callback ( new MongoRuntimeError ( 'No auth context found on connection.' ) ) ;
554+ throw new MongoRuntimeError ( 'No auth context found on connection.' ) ;
615555 }
616556 const credentials = authContext . credentials ;
617557 if ( ! credentials ) {
618- return callback (
619- new MongoMissingCredentialsError (
620- 'Connection is missing credentials when asked to reauthenticate'
621- )
558+ throw new MongoMissingCredentialsError (
559+ 'Connection is missing credentials when asked to reauthenticate'
622560 ) ;
623561 }
562+
624563 const resolvedCredentials = credentials . resolveAuthMechanism ( connection . hello ) ;
625564 const provider = this [ kServer ] . topology . client . s . authProviders . getOrCreateProvider (
626565 resolvedCredentials . mechanism
627566 ) ;
567+
628568 if ( ! provider ) {
629- return callback (
630- new MongoMissingCredentialsError (
631- `Reauthenticate failed due to no auth provider for ${ credentials . mechanism } `
632- )
569+ throw new MongoMissingCredentialsError (
570+ `Reauthenticate failed due to no auth provider for ${ credentials . mechanism } `
633571 ) ;
634572 }
635- provider . reauth ( authContext ) . then (
636- ( ) => {
637- fn ( undefined , connection , ( fnErr , fnResult ) => {
638- if ( fnErr ) {
639- return callback ( fnErr ) ;
640- }
641- callback ( undefined , fnResult ) ;
642- } ) ;
643- } ,
644- error => callback ( error )
645- ) ;
573+
574+ await provider . reauth ( authContext ) ;
575+
576+ return ;
646577 }
647578
648579 /** Clear the min pool size timer */
@@ -841,7 +772,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
841772 ) ;
842773 waitQueueMember . timeoutController . clear ( ) ;
843774 this [ kWaitQueue ] . shift ( ) ;
844- waitQueueMember . callback ( error ) ;
775+ waitQueueMember . reject ( error ) ;
845776 continue ;
846777 }
847778
@@ -863,7 +794,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
863794 waitQueueMember . timeoutController . clear ( ) ;
864795
865796 this [ kWaitQueue ] . shift ( ) ;
866- waitQueueMember . callback ( undefined , connection ) ;
797+ waitQueueMember . resolve ( connection ) ;
867798 }
868799 }
869800
@@ -889,16 +820,17 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
889820 // TODO(NODE-5192): Remove this cast
890821 new ConnectionCheckOutFailedEvent ( this , 'connectionError' , err as MongoError )
891822 ) ;
823+ waitQueueMember . reject ( err ) ;
892824 } else if ( connection ) {
893825 this [ kCheckedOut ] . add ( connection ) ;
894826 this . emitAndLog (
895827 ConnectionPool . CONNECTION_CHECKED_OUT ,
896828 new ConnectionCheckedOutEvent ( this , connection )
897829 ) ;
830+ waitQueueMember . resolve ( connection ) ;
898831 }
899832
900833 waitQueueMember . timeoutController . clear ( ) ;
901- waitQueueMember . callback ( err , connection ) ;
902834 }
903835 process . nextTick ( ( ) => this . processWaitQueue ( ) ) ;
904836 } ) ;
0 commit comments