@@ -40,8 +40,6 @@ namespace MongoDB.Driver.Core.Connections
4040 internal class BinaryConnection : IConnection
4141 {
4242 // fields
43- private readonly CancellationToken _backgroundTaskCancellationToken ;
44- private readonly CancellationTokenSource _backgroundTaskCancellationTokenSource ;
4543 private readonly CommandEventHelper _commandEventHelper ;
4644 private readonly ICompressorSource _compressorSource ;
4745 private ConnectionId _connectionId ;
@@ -85,9 +83,6 @@ public BinaryConnection(ServerId serverId, EndPoint endPoint, ConnectionSettings
8583 _connectionInitializer = Ensure . IsNotNull ( connectionInitializer , nameof ( connectionInitializer ) ) ;
8684 Ensure . IsNotNull ( eventSubscriber , nameof ( eventSubscriber ) ) ;
8785
88- _backgroundTaskCancellationTokenSource = new CancellationTokenSource ( ) ;
89- _backgroundTaskCancellationToken = _backgroundTaskCancellationTokenSource . Token ;
90-
9186 _connectionId = new ConnectionId ( serverId ) ;
9287 _receiveLock = new SemaphoreSlim ( 1 ) ;
9388 _sendLock = new SemaphoreSlim ( 1 ) ;
@@ -199,8 +194,6 @@ private void Dispose(bool disposing)
199194 }
200195
201196 var stopwatch = Stopwatch . StartNew ( ) ;
202- _backgroundTaskCancellationTokenSource . Cancel ( ) ;
203- _backgroundTaskCancellationTokenSource . Dispose ( ) ;
204197 _receiveLock . Dispose ( ) ;
205198 _sendLock . Dispose ( ) ;
206199
@@ -237,7 +230,7 @@ private void EnsureMessageSizeIsValid(int messageSize)
237230
238231 public void Open ( CancellationToken cancellationToken )
239232 {
240- ThrowIfDisposed ( ) ;
233+ ThrowIfCancelledOrDisposed ( cancellationToken ) ;
241234
242235 TaskCompletionSource < bool > taskCompletionSource = null ;
243236 var connecting = false ;
@@ -274,7 +267,7 @@ public void Open(CancellationToken cancellationToken)
274267
275268 public Task OpenAsync ( CancellationToken cancellationToken )
276269 {
277- ThrowIfDisposed ( ) ;
270+ ThrowIfCancelledOrDisposed ( cancellationToken ) ;
278271
279272 lock ( _openLock )
280273 {
@@ -329,19 +322,19 @@ private async Task OpenHelperAsync(CancellationToken cancellationToken)
329322 }
330323 }
331324
332- private IByteBuffer ReceiveBuffer ( )
325+ private IByteBuffer ReceiveBuffer ( CancellationToken cancellationToken )
333326 {
334327 try
335328 {
336329 var messageSizeBytes = new byte [ 4 ] ;
337- _stream . ReadBytes ( messageSizeBytes , 0 , 4 , _backgroundTaskCancellationToken ) ;
330+ _stream . ReadBytes ( messageSizeBytes , 0 , 4 , cancellationToken ) ;
338331 var messageSize = BitConverter . ToInt32 ( messageSizeBytes , 0 ) ;
339332 EnsureMessageSizeIsValid ( messageSize ) ;
340333 var inputBufferChunkSource = new InputBufferChunkSource ( BsonChunkPool . Default ) ;
341334 var buffer = ByteBufferFactory . Create ( inputBufferChunkSource , messageSize ) ;
342335 buffer . Length = messageSize ;
343336 buffer . SetBytes ( 0 , messageSizeBytes , 0 , 4 ) ;
344- _stream . ReadBytes ( buffer , 4 , messageSize - 4 , _backgroundTaskCancellationToken ) ;
337+ _stream . ReadBytes ( buffer , 4 , messageSize - 4 , cancellationToken ) ;
345338 _lastUsedAtUtc = DateTime . UtcNow ;
346339 buffer . MakeReadOnly ( ) ;
347340 return buffer ;
@@ -370,7 +363,7 @@ private IByteBuffer ReceiveBuffer(int responseTo, CancellationToken cancellation
370363 receiveLockRequest . Task . GetAwaiter ( ) . GetResult ( ) ; // propagate exceptions
371364 while ( true )
372365 {
373- var buffer = ReceiveBuffer ( ) ;
366+ var buffer = ReceiveBuffer ( cancellationToken ) ;
374367 _dropbox . AddMessage ( buffer ) ;
375368
376369 if ( messageTask . IsCompleted )
@@ -391,20 +384,20 @@ private IByteBuffer ReceiveBuffer(int responseTo, CancellationToken cancellation
391384 }
392385 }
393386
394- private async Task < IByteBuffer > ReceiveBufferAsync ( )
387+ private async Task < IByteBuffer > ReceiveBufferAsync ( CancellationToken cancellationToken )
395388 {
396389 try
397390 {
398391 var messageSizeBytes = new byte [ 4 ] ;
399392 var readTimeout = _stream . CanTimeout ? TimeSpan . FromMilliseconds ( _stream . ReadTimeout ) : Timeout . InfiniteTimeSpan ;
400- await _stream . ReadBytesAsync ( messageSizeBytes , 0 , 4 , readTimeout , _backgroundTaskCancellationToken ) . ConfigureAwait ( false ) ;
393+ await _stream . ReadBytesAsync ( messageSizeBytes , 0 , 4 , readTimeout , cancellationToken ) . ConfigureAwait ( false ) ;
401394 var messageSize = BitConverter . ToInt32 ( messageSizeBytes , 0 ) ;
402395 EnsureMessageSizeIsValid ( messageSize ) ;
403396 var inputBufferChunkSource = new InputBufferChunkSource ( BsonChunkPool . Default ) ;
404397 var buffer = ByteBufferFactory . Create ( inputBufferChunkSource , messageSize ) ;
405398 buffer . Length = messageSize ;
406399 buffer . SetBytes ( 0 , messageSizeBytes , 0 , 4 ) ;
407- await _stream . ReadBytesAsync ( buffer , 4 , messageSize - 4 , readTimeout , _backgroundTaskCancellationToken ) . ConfigureAwait ( false ) ;
400+ await _stream . ReadBytesAsync ( buffer , 4 , messageSize - 4 , readTimeout , cancellationToken ) . ConfigureAwait ( false ) ;
408401 _lastUsedAtUtc = DateTime . UtcNow ;
409402 buffer . MakeReadOnly ( ) ;
410403 return buffer ;
@@ -433,7 +426,7 @@ private async Task<IByteBuffer> ReceiveBufferAsync(int responseTo, CancellationT
433426 receiveLockRequest . Task . GetAwaiter ( ) . GetResult ( ) ; // propagate exceptions
434427 while ( true )
435428 {
436- var buffer = await ReceiveBufferAsync ( ) . ConfigureAwait ( false ) ;
429+ var buffer = await ReceiveBufferAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
437430 _dropbox . AddMessage ( buffer ) ;
438431
439432 if ( messageTask . IsCompleted )
@@ -461,7 +454,7 @@ public ResponseMessage ReceiveMessage(
461454 CancellationToken cancellationToken )
462455 {
463456 Ensure . IsNotNull ( encoderSelector , nameof ( encoderSelector ) ) ;
464- ThrowIfDisposedOrNotOpen ( ) ;
457+ ThrowIfCancelledOrDisposedOrNotOpen ( cancellationToken ) ;
465458
466459 var helper = new ReceiveMessageHelper ( this , responseTo , messageEncoderSettings , _compressorSource ) ;
467460 try
@@ -477,7 +470,7 @@ public ResponseMessage ReceiveMessage(
477470 catch ( Exception ex )
478471 {
479472 helper . FailedReceivingMessage ( ex ) ;
480- throw ;
473+ throw WrapInOperationCanceledExceptionIfRequired ( ex , cancellationToken ) ;
481474 }
482475 }
483476
@@ -488,7 +481,7 @@ public async Task<ResponseMessage> ReceiveMessageAsync(
488481 CancellationToken cancellationToken )
489482 {
490483 Ensure . IsNotNull ( encoderSelector , nameof ( encoderSelector ) ) ;
491- ThrowIfDisposedOrNotOpen ( ) ;
484+ ThrowIfCancelledOrDisposedOrNotOpen ( cancellationToken ) ;
492485
493486 var helper = new ReceiveMessageHelper ( this , responseTo , messageEncoderSettings , _compressorSource ) ;
494487 try
@@ -504,7 +497,7 @@ public async Task<ResponseMessage> ReceiveMessageAsync(
504497 catch ( Exception ex )
505498 {
506499 helper . FailedReceivingMessage ( ex ) ;
507- throw ;
500+ throw WrapInOperationCanceledExceptionIfRequired ( ex , cancellationToken ) ;
508501 }
509502 }
510503
@@ -520,8 +513,7 @@ private void SendBuffer(IByteBuffer buffer, CancellationToken cancellationToken)
520513
521514 try
522515 {
523- // don't use the caller's cancellationToken because once we start writing a message we have to write the whole thing
524- _stream . WriteBytes ( buffer , 0 , buffer . Length , _backgroundTaskCancellationToken ) ;
516+ _stream . WriteBytes ( buffer , 0 , buffer . Length , cancellationToken ) ;
525517 _lastUsedAtUtc = DateTime . UtcNow ;
526518 }
527519 catch ( Exception ex )
@@ -549,9 +541,8 @@ private async Task SendBufferAsync(IByteBuffer buffer, CancellationToken cancell
549541
550542 try
551543 {
552- // don't use the caller's cancellationToken because once we start writing a message we have to write the whole thing
553544 var writeTimeout = _stream . CanTimeout ? TimeSpan . FromMilliseconds ( _stream . WriteTimeout ) : Timeout . InfiniteTimeSpan ;
554- await _stream . WriteBytesAsync ( buffer , 0 , buffer . Length , writeTimeout , _backgroundTaskCancellationToken ) . ConfigureAwait ( false ) ;
545+ await _stream . WriteBytesAsync ( buffer , 0 , buffer . Length , writeTimeout , cancellationToken ) . ConfigureAwait ( false ) ;
555546 _lastUsedAtUtc = DateTime . UtcNow ;
556547 }
557548 catch ( Exception ex )
@@ -570,7 +561,7 @@ private async Task SendBufferAsync(IByteBuffer buffer, CancellationToken cancell
570561 public void SendMessages ( IEnumerable < RequestMessage > messages , MessageEncoderSettings messageEncoderSettings , CancellationToken cancellationToken )
571562 {
572563 Ensure . IsNotNull ( messages , nameof ( messages ) ) ;
573- ThrowIfDisposedOrNotOpen ( ) ;
564+ ThrowIfCancelledOrDisposedOrNotOpen ( cancellationToken ) ;
574565
575566 var helper = new SendMessagesHelper ( this , messages , messageEncoderSettings ) ;
576567 try
@@ -599,14 +590,14 @@ public void SendMessages(IEnumerable<RequestMessage> messages, MessageEncoderSet
599590 catch ( Exception ex )
600591 {
601592 helper . FailedSendingMessages ( ex ) ;
602- throw ;
593+ throw WrapInOperationCanceledExceptionIfRequired ( ex , cancellationToken ) ;
603594 }
604595 }
605596
606597 public async Task SendMessagesAsync ( IEnumerable < RequestMessage > messages , MessageEncoderSettings messageEncoderSettings , CancellationToken cancellationToken )
607598 {
608599 Ensure . IsNotNull ( messages , nameof ( messages ) ) ;
609- ThrowIfDisposedOrNotOpen ( ) ;
600+ ThrowIfCancelledOrDisposedOrNotOpen ( cancellationToken ) ;
610601
611602 var helper = new SendMessagesHelper ( this , messages , messageEncoderSettings ) ;
612603 try
@@ -635,10 +626,16 @@ public async Task SendMessagesAsync(IEnumerable<RequestMessage> messages, Messag
635626 catch ( Exception ex )
636627 {
637628 helper . FailedSendingMessages ( ex ) ;
638- throw ;
629+ throw WrapInOperationCanceledExceptionIfRequired ( ex , cancellationToken ) ;
639630 }
640631 }
641632
633+ public void SetReadTimeout ( TimeSpan timeout )
634+ {
635+ ThrowIfDisposed ( ) ;
636+ _stream . ReadTimeout = ( int ) timeout . TotalMilliseconds ;
637+ }
638+
642639 // private methods
643640 private bool AnyMessageNeedsToBeCompressed ( IEnumerable < RequestMessage > messages )
644641 {
@@ -698,17 +695,15 @@ private void CompressMessage(
698695 compressedMessageEncoder . WriteMessage ( compressedMessage ) ;
699696 }
700697
701- private void ThrowIfDisposed ( )
698+ private void ThrowIfCancelledOrDisposed ( CancellationToken cancellationToken = default )
702699 {
703- if ( _state . Value == State . Disposed )
704- {
705- throw new ObjectDisposedException ( GetType ( ) . Name ) ;
706- }
700+ cancellationToken . ThrowIfCancellationRequested ( ) ;
701+ ThrowIfDisposed ( ) ;
707702 }
708703
709- private void ThrowIfDisposedOrNotOpen ( )
704+ private void ThrowIfCancelledOrDisposedOrNotOpen ( CancellationToken cancellationToken )
710705 {
711- ThrowIfDisposed ( ) ;
706+ ThrowIfCancelledOrDisposed ( cancellationToken ) ;
712707 if ( _state . Value == State . Failed )
713708 {
714709 throw new MongoConnectionClosedException ( _connectionId ) ;
@@ -719,6 +714,14 @@ private void ThrowIfDisposedOrNotOpen()
719714 }
720715 }
721716
717+ private void ThrowIfDisposed ( )
718+ {
719+ if ( _state . Value == State . Disposed )
720+ {
721+ throw new ObjectDisposedException ( GetType ( ) . Name ) ;
722+ }
723+ }
724+
722725 private Exception WrapException ( Exception ex , string action )
723726 {
724727 if (
@@ -727,7 +730,9 @@ ex is ThreadAbortException ||
727730 ex is StackOverflowException ||
728731#endif
729732 ex is MongoAuthenticationException ||
730- ex is OutOfMemoryException )
733+ ex is OutOfMemoryException ||
734+ ex is OperationCanceledException ||
735+ ex is ObjectDisposedException )
731736 {
732737 return ex;
733738 }
@@ -738,6 +743,23 @@ ex is MongoAuthenticationException ||
738743 }
739744 }
740745
746+ private Exception WrapInOperationCanceledExceptionIfRequired ( Exception exception , CancellationToken cancellationToken )
747+ {
748+ if ( exception is ObjectDisposedException objectDisposedException )
749+ {
750+ // We expect two cases here:
751+ // objectDisposedException.ObjectName == GetType().Name
752+ // objectDisposedException.Message == "The semaphore has been disposed."
753+ // but since the last one is language-specific, the only option we have is avoiding any additional conditions for ObjectDisposedException
754+ // TODO: this logic should be reviewed in the scope of https://jira.mongodb.org/browse/CSHARP-3165
755+ return new OperationCanceledException( $"The {nameof(BinaryConnection)} operation has been cancelled." , exception ) ;
756+ }
757+ else
758+ {
759+ return exception;
760+ }
761+ }
762+
741763 // nested classes
742764 private class Dropbox
743765 {
0 commit comments