1515
1616using System ;
1717using System . Buffers . Binary ;
18- using System . Collections . Concurrent ;
1918using System . Diagnostics ;
2019using System . IO ;
2120using System . Net ;
@@ -46,15 +45,12 @@ internal sealed class BinaryConnection : IConnection
4645 private ConnectionInitializerContext _connectionInitializerContext ;
4746 private EndPoint _endPoint ;
4847 private ConnectionDescription _description ;
49- private readonly Dropbox _dropbox = new Dropbox ( ) ;
5048 private bool _failedEventHasBeenRaised ;
5149 private DateTime _lastUsedAtUtc ;
5250 private DateTime _openedAtUtc ;
5351 private readonly object _openLock = new object ( ) ;
5452 private Task _openTask ;
55- private readonly SemaphoreSlim _receiveLock ;
5653 private CompressorType ? _sendCompressorType ;
57- private readonly SemaphoreSlim _sendLock ;
5854 private readonly ConnectionSettings _settings ;
5955 private readonly TimeSpan _socketReadTimeout ;
6056 private readonly TimeSpan _socketWriteTimeout ;
@@ -83,8 +79,6 @@ public BinaryConnection(
8379 Ensure . IsNotNull ( eventSubscriber , nameof ( eventSubscriber ) ) ;
8480
8581 _connectionId = new ConnectionId ( serverId , settings . ConnectionIdLocalValueProvider ( ) ) ;
86- _receiveLock = new SemaphoreSlim ( 1 ) ;
87- _sendLock = new SemaphoreSlim ( 1 ) ;
8882 _state = new InterlockedInt32 ( State . Initial ) ;
8983
9084 _compressorSource = new CompressorSource ( settings . Compressors ) ;
@@ -179,9 +173,6 @@ private void Dispose(bool disposing)
179173 _eventLogger . LogAndPublish ( new ConnectionClosingEvent ( _connectionId , EventContext . OperationId ) ) ;
180174
181175 var stopwatch = Stopwatch . StartNew ( ) ;
182- _receiveLock . Dispose ( ) ;
183- _sendLock . Dispose ( ) ;
184-
185176 if ( _stream != null )
186177 {
187178 try
@@ -374,50 +365,6 @@ private IByteBuffer ReceiveBuffer(OperationContext operationContext)
374365 }
375366 }
376367
377- private IByteBuffer ReceiveBuffer ( OperationContext operationContext , int responseTo )
378- {
379- using ( var receiveLockRequest = new SemaphoreSlimRequest ( _receiveLock , operationContext . RemainingTimeout , operationContext . CancellationToken ) )
380- {
381- var messageTask = _dropbox . GetMessageAsync ( responseTo ) ;
382- try
383- {
384- Task . WaitAny ( messageTask , receiveLockRequest . Task ) ;
385- if ( messageTask . IsCompleted )
386- {
387- return _dropbox . RemoveMessage ( responseTo ) ; // also propagates exception if any
388- }
389-
390- receiveLockRequest . Task . GetAwaiter ( ) . GetResult ( ) ; // propagate exceptions
391- while ( true )
392- {
393- try
394- {
395- var buffer = ReceiveBuffer ( operationContext ) ;
396- _dropbox . AddMessage ( buffer ) ;
397- }
398- catch ( Exception ex )
399- {
400- _dropbox . AddException ( ex ) ;
401- }
402-
403- if ( messageTask . IsCompleted )
404- {
405- return _dropbox . RemoveMessage ( responseTo ) ; // also propagates exception if any
406- }
407-
408- operationContext . ThrowIfTimedOutOrCanceled ( ) ;
409- }
410- }
411- catch
412- {
413- var ignored = messageTask . ContinueWith (
414- t => { _dropbox . RemoveMessage ( responseTo ) . Dispose ( ) ; } ,
415- TaskContinuationOptions . OnlyOnRanToCompletion ) ;
416- throw ;
417- }
418- }
419- }
420-
421368 private async Task < IByteBuffer > ReceiveBufferAsync ( OperationContext operationContext )
422369 {
423370 try
@@ -443,50 +390,6 @@ private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationCon
443390 }
444391 }
445392
446- private async Task < IByteBuffer > ReceiveBufferAsync ( OperationContext operationContext , int responseTo )
447- {
448- using ( var receiveLockRequest = new SemaphoreSlimRequest ( _receiveLock , operationContext . RemainingTimeout , operationContext . CancellationToken ) )
449- {
450- var messageTask = _dropbox . GetMessageAsync ( responseTo ) ;
451- try
452- {
453- await Task . WhenAny ( messageTask , receiveLockRequest . Task ) . ConfigureAwait ( false ) ;
454- if ( messageTask . IsCompleted )
455- {
456- return _dropbox . RemoveMessage ( responseTo ) ; // also propagates exception if any
457- }
458-
459- await receiveLockRequest . Task . ConfigureAwait ( false ) ; // propagate exceptions
460- while ( true )
461- {
462- try
463- {
464- var buffer = await ReceiveBufferAsync ( operationContext ) . ConfigureAwait ( false ) ;
465- _dropbox . AddMessage ( buffer ) ;
466- }
467- catch ( Exception ex )
468- {
469- _dropbox . AddException ( ex ) ;
470- }
471-
472- if ( messageTask . IsCompleted )
473- {
474- return _dropbox . RemoveMessage ( responseTo ) ; // also propagates exception if any
475- }
476-
477- operationContext . ThrowIfTimedOutOrCanceled ( ) ;
478- }
479- }
480- catch
481- {
482- var ignored = messageTask . ContinueWith (
483- t => { _dropbox . RemoveMessage ( responseTo ) . Dispose ( ) ; } ,
484- TaskContinuationOptions . OnlyOnRanToCompletion ) ;
485- throw ;
486- }
487- }
488- }
489-
490393 public ResponseMessage ReceiveMessage (
491394 OperationContext operationContext ,
492395 int responseTo ,
@@ -500,11 +403,19 @@ public ResponseMessage ReceiveMessage(
500403 try
501404 {
502405 helper . ReceivingMessage ( ) ;
503- using ( var buffer = ReceiveBuffer ( operationContext , responseTo ) )
406+ while ( true )
504407 {
505- var message = helper . DecodeMessage ( operationContext , buffer , encoderSelector ) ;
506- helper . ReceivedMessage ( buffer , message ) ;
507- return message ;
408+ using ( var buffer = ReceiveBuffer ( operationContext ) )
409+ {
410+ if ( responseTo != GetResponseTo ( buffer ) )
411+ {
412+ continue ;
413+ }
414+
415+ var message = helper . DecodeMessage ( operationContext , buffer , encoderSelector ) ;
416+ helper . ReceivedMessage ( buffer , message ) ;
417+ return message ;
418+ }
508419 }
509420 }
510421 catch ( Exception ex )
@@ -515,7 +426,9 @@ public ResponseMessage ReceiveMessage(
515426 }
516427 }
517428
518- public async Task < ResponseMessage > ReceiveMessageAsync ( OperationContext operationContext , int responseTo ,
429+ public async Task < ResponseMessage > ReceiveMessageAsync (
430+ OperationContext operationContext ,
431+ int responseTo ,
519432 IMessageEncoderSelector encoderSelector ,
520433 MessageEncoderSettings messageEncoderSettings )
521434 {
@@ -526,11 +439,19 @@ public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operatio
526439 try
527440 {
528441 helper . ReceivingMessage ( ) ;
529- using ( var buffer = await ReceiveBufferAsync ( operationContext , responseTo ) . ConfigureAwait ( false ) )
442+ while ( true )
530443 {
531- var message = helper . DecodeMessage ( operationContext , buffer , encoderSelector ) ;
532- helper . ReceivedMessage ( buffer , message ) ;
533- return message ;
444+ using ( var buffer = await ReceiveBufferAsync ( operationContext ) . ConfigureAwait ( false ) )
445+ {
446+ if ( responseTo != GetResponseTo ( buffer ) )
447+ {
448+ continue ;
449+ }
450+
451+ var message = helper . DecodeMessage ( operationContext , buffer , encoderSelector ) ;
452+ helper . ReceivedMessage ( buffer , message ) ;
453+ return message ;
454+ }
534455 }
535456 }
536457 catch ( Exception ex )
@@ -541,59 +462,49 @@ public async Task<ResponseMessage> ReceiveMessageAsync(OperationContext operatio
541462 }
542463 }
543464
465+ private int GetResponseTo ( IByteBuffer message )
466+ {
467+ var backingBytes = message . AccessBackingBytes ( 8 ) ;
468+ return BitConverter . ToInt32 ( backingBytes . Array , backingBytes . Offset ) ;
469+ }
470+
544471 private void SendBuffer ( OperationContext operationContext , IByteBuffer buffer )
545472 {
546- _sendLock . Wait ( operationContext . RemainingTimeout , operationContext . CancellationToken ) ;
547- try
473+ if ( _state . Value == State . Failed )
548474 {
549- if ( _state . Value == State . Failed )
550- {
551- throw new MongoConnectionClosedException ( _connectionId ) ;
552- }
475+ throw new MongoConnectionClosedException ( _connectionId ) ;
476+ }
553477
554- try
555- {
556- _stream . WriteBytes ( operationContext , buffer , 0 , buffer . Length , _socketWriteTimeout ) ;
557- _lastUsedAtUtc = DateTime . UtcNow ;
558- }
559- catch ( Exception ex )
560- {
561- var wrappedException = WrapExceptionIfRequired ( operationContext , ex , "sending a message to the server" ) ;
562- ConnectionFailed ( wrappedException ?? ex ) ;
563- if ( wrappedException == null ) { throw ; } else { throw wrappedException ; }
564- }
478+ try
479+ {
480+ _stream . WriteBytes ( operationContext , buffer , 0 , buffer . Length , _socketWriteTimeout ) ;
481+ _lastUsedAtUtc = DateTime . UtcNow ;
565482 }
566- finally
483+ catch ( Exception ex )
567484 {
568- _sendLock . Release ( ) ;
485+ var wrappedException = WrapExceptionIfRequired ( operationContext , ex , "sending a message to the server" ) ;
486+ ConnectionFailed ( wrappedException ?? ex ) ;
487+ if ( wrappedException == null ) { throw ; } else { throw wrappedException ; }
569488 }
570489 }
571490
572491 private async Task SendBufferAsync ( OperationContext operationContext , IByteBuffer buffer )
573492 {
574- await _sendLock . WaitAsync ( operationContext . RemainingTimeout , operationContext . CancellationToken ) . ConfigureAwait ( false ) ;
575- try
493+ if ( _state . Value == State . Failed )
576494 {
577- if ( _state . Value == State . Failed )
578- {
579- throw new MongoConnectionClosedException ( _connectionId ) ;
580- }
495+ throw new MongoConnectionClosedException ( _connectionId ) ;
496+ }
581497
582- try
583- {
584- await _stream . WriteBytesAsync ( operationContext , buffer , 0 , buffer . Length , _socketWriteTimeout ) . ConfigureAwait ( false ) ;
585- _lastUsedAtUtc = DateTime . UtcNow ;
586- }
587- catch ( Exception ex )
588- {
589- var wrappedException = WrapExceptionIfRequired ( operationContext , ex , "sending a message to the server" ) ;
590- ConnectionFailed ( wrappedException ?? ex ) ;
591- if ( wrappedException == null ) { throw ; } else { throw wrappedException ; }
592- }
498+ try
499+ {
500+ await _stream . WriteBytesAsync ( operationContext , buffer , 0 , buffer . Length , _socketWriteTimeout ) . ConfigureAwait ( false ) ;
501+ _lastUsedAtUtc = DateTime . UtcNow ;
593502 }
594- finally
503+ catch ( Exception ex )
595504 {
596- _sendLock . Release ( ) ;
505+ var wrappedException = WrapExceptionIfRequired ( operationContext , ex , "sending a message to the server" ) ;
506+ ConnectionFailed ( wrappedException ?? ex ) ;
507+ if ( wrappedException == null ) { throw ; } else { throw wrappedException ; }
597508 }
598509 }
599510
@@ -790,47 +701,6 @@ private void ThrowOperationCanceledExceptionIfRequired(Exception exception)
790701 }
791702
792703 // nested classes
793- private class Dropbox
794- {
795- private readonly ConcurrentDictionary < int , TaskCompletionSource < IByteBuffer > > _messages = new ConcurrentDictionary < int , TaskCompletionSource < IByteBuffer > > ( ) ;
796-
797- // public methods
798- public void AddException ( Exception exception )
799- {
800- foreach ( var taskCompletionSource in _messages . Values )
801- {
802- taskCompletionSource . TrySetException ( exception ) ; // has no effect on already completed tasks
803- }
804- }
805-
806- public void AddMessage ( IByteBuffer message )
807- {
808- var responseTo = GetResponseTo ( message ) ;
809- var tcs = _messages . GetOrAdd ( responseTo , x => new TaskCompletionSource < IByteBuffer > ( ) ) ;
810- tcs . TrySetResult ( message ) ;
811- }
812-
813- public Task < IByteBuffer > GetMessageAsync ( int responseTo )
814- {
815- var tcs = _messages . GetOrAdd ( responseTo , _ => new TaskCompletionSource < IByteBuffer > ( ) ) ;
816- return tcs . Task ;
817- }
818-
819- public IByteBuffer RemoveMessage ( int responseTo )
820- {
821- TaskCompletionSource < IByteBuffer > tcs ;
822- _messages . TryRemove ( responseTo , out tcs ) ;
823- return tcs . Task . GetAwaiter ( ) . GetResult ( ) ; // RemoveMessage is only called when Task is complete
824- }
825-
826- // private methods
827- private int GetResponseTo ( IByteBuffer message )
828- {
829- var backingBytes = message . AccessBackingBytes ( 8 ) ;
830- return BinaryPrimitives . ReadInt32LittleEndian ( new ReadOnlySpan < byte > ( backingBytes . Array , backingBytes . Offset , 4 ) ) ;
831- }
832- }
833-
834704 private class OpenConnectionHelper
835705 {
836706 private readonly BinaryConnection _connection ;
0 commit comments