@@ -29,14 +29,14 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929import type { ReadPreferenceLike } from '../read_preference' ;
3030import { applySession , type ClientSession , updateSessionFromResponse } from '../sessions' ;
3131import {
32- abortable ,
3332 BufferPool ,
3433 calculateDurationInMs ,
3534 type Callback ,
3635 HostAddress ,
3736 maxWireVersion ,
3837 type MongoDBNamespace ,
3938 now ,
39+ promiseWithResolvers ,
4040 uuidV4
4141} from '../utils' ;
4242import type { WriteConcern } from '../write_concern' ;
@@ -161,15 +161,14 @@ function streamIdentifier(stream: Stream, options: ConnectionOptions): string {
161161export class Connection extends TypedEventEmitter < ConnectionEvents > {
162162 public id : number | '<monitor>' ;
163163 public address : string ;
164- public lastHelloMS ?: number ;
164+ public lastHelloMS = - 1 ;
165165 public serverApi ?: ServerApi ;
166- public helloOk ?: boolean ;
166+ public helloOk = false ;
167167 public authContext ?: AuthContext ;
168168 public delayedTimeoutId : NodeJS . Timeout | null = null ;
169169 public generation : number ;
170170 public readonly description : Readonly < StreamDescription > ;
171171 /**
172- * @public
173172 * Represents if the connection has been established:
174173 * - TCP handshake
175174 * - TLS negotiated
@@ -180,15 +179,16 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
180179 public established : boolean ;
181180
182181 private lastUseTime : number ;
183- private socketTimeoutMS : number ;
184- private monitorCommands : boolean ;
185- private socket : Stream ;
186- private controller : AbortController ;
187- private messageStream : Readable ;
188- private socketWrite : ( buffer : Uint8Array ) => Promise < void > ;
189182 private clusterTime : Document | null = null ;
190- /** @internal */
191- override mongoLogger : MongoLogger | undefined ;
183+
184+ private readonly socketTimeoutMS : number ;
185+ private readonly monitorCommands : boolean ;
186+ private readonly socket : Stream ;
187+ private readonly controller : AbortController ;
188+ private readonly signal : AbortSignal ;
189+ private readonly messageStream : Readable ;
190+ private readonly socketWrite : ( buffer : Uint8Array ) => Promise < void > ;
191+ private readonly aborted : Promise < never > ;
192192
193193 /** @event */
194194 static readonly COMMAND_STARTED = COMMAND_STARTED ;
@@ -221,7 +221,21 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
221221 this . lastUseTime = now ( ) ;
222222
223223 this . socket = stream ;
224+
225+ // TODO: Remove signal from connection layer
224226 this . controller = new AbortController ( ) ;
227+ const { signal } = this . controller ;
228+ this . signal = signal ;
229+ const { promise : aborted , reject } = promiseWithResolvers < never > ( ) ;
230+ aborted . then ( undefined , ( ) => null ) ; // Prevent unhandled rejection
231+ this . signal . addEventListener (
232+ 'abort' ,
233+ function onAbort ( ) {
234+ reject ( signal . reason ) ;
235+ } ,
236+ { once : true }
237+ ) ;
238+ this . aborted = aborted ;
225239
226240 this . messageStream = this . socket
227241 . on ( 'error' , this . onError . bind ( this ) )
@@ -232,13 +246,13 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
232246
233247 const socketWrite = promisify ( this . socket . write . bind ( this . socket ) ) ;
234248 this . socketWrite = async buffer => {
235- return abortable ( socketWrite ( buffer ) , { signal : this . controller . signal } ) ;
249+ return Promise . race ( [ socketWrite ( buffer ) , this . aborted ] ) ;
236250 } ;
237251 }
238252
239253 /** Indicates that the connection (including underlying TCP socket) has been closed. */
240254 public get closed ( ) : boolean {
241- return this . controller . signal . aborted ;
255+ return this . signal . aborted ;
242256 }
243257
244258 public get hello ( ) {
@@ -407,7 +421,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
407421 }
408422
409423 private async * sendWire ( message : WriteProtocolMessageType , options : CommandOptions ) {
410- this . controller . signal . throwIfAborted ( ) ;
424+ this . throwIfAborted ( ) ;
411425
412426 if ( typeof options . socketTimeoutMS === 'number' ) {
413427 this . socket . setTimeout ( options . socketTimeoutMS ) ;
@@ -426,7 +440,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
426440 return ;
427441 }
428442
429- this . controller . signal . throwIfAborted ( ) ;
443+ this . throwIfAborted ( ) ;
430444
431445 for await ( const response of this . readMany ( ) ) {
432446 this . socket . setTimeout ( 0 ) ;
@@ -447,7 +461,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
447461 }
448462
449463 yield document ;
450- this . controller . signal . throwIfAborted ( ) ;
464+ this . throwIfAborted ( ) ;
451465
452466 if ( typeof options . socketTimeoutMS === 'number' ) {
453467 this . socket . setTimeout ( options . socketTimeoutMS ) ;
@@ -481,7 +495,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
481495
482496 let document ;
483497 try {
484- this . controller . signal . throwIfAborted ( ) ;
498+ this . throwIfAborted ( ) ;
485499 for await ( document of this . sendWire ( message , options ) ) {
486500 if ( ! Buffer . isBuffer ( document ) && document . writeConcernError ) {
487501 throw new MongoWriteConcernError ( document . writeConcernError , document ) ;
@@ -511,7 +525,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
511525 }
512526
513527 yield document ;
514- this . controller . signal . throwIfAborted ( ) ;
528+ this . throwIfAborted ( ) ;
515529 }
516530 } catch ( error ) {
517531 if ( this . shouldEmitAndLogCommand ) {
@@ -554,7 +568,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
554568 command : Document ,
555569 options : CommandOptions = { }
556570 ) : Promise < Document > {
557- this . controller . signal . throwIfAborted ( ) ;
571+ this . throwIfAborted ( ) ;
558572 for await ( const document of this . sendCommand ( ns , command , options ) ) {
559573 return document ;
560574 }
@@ -568,16 +582,20 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
568582 replyListener : Callback
569583 ) {
570584 const exhaustLoop = async ( ) => {
571- this . controller . signal . throwIfAborted ( ) ;
585+ this . throwIfAborted ( ) ;
572586 for await ( const reply of this . sendCommand ( ns , command , options ) ) {
573587 replyListener ( undefined , reply ) ;
574- this . controller . signal . throwIfAborted ( ) ;
588+ this . throwIfAborted ( ) ;
575589 }
576590 throw new MongoUnexpectedServerResponseError ( 'Server ended moreToCome unexpectedly' ) ;
577591 } ;
578592 exhaustLoop ( ) . catch ( replyListener ) ;
579593 }
580594
595+ private throwIfAborted ( ) {
596+ this . signal . throwIfAborted ( ) ;
597+ }
598+
581599 /**
582600 * @internal
583601 *
@@ -611,7 +629,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
611629 * Note that `for-await` loops call `return` automatically when the loop is exited.
612630 */
613631 private async * readMany ( ) : AsyncGenerator < OpMsgResponse | OpQueryResponse > {
614- for await ( const message of onData ( this . messageStream , { signal : this . controller . signal } ) ) {
632+ for await ( const message of onData ( this . messageStream , { signal : this . signal } ) ) {
615633 const response = await decompressResponse ( message ) ;
616634 yield response ;
617635
0 commit comments