@@ -363,8 +363,16 @@ private IByteBuffer ReceiveBuffer(int responseTo, CancellationToken cancellation
363363 receiveLockRequest . Task . GetAwaiter ( ) . GetResult ( ) ; // propagate exceptions
364364 while ( true )
365365 {
366- var buffer = ReceiveBuffer ( cancellationToken ) ;
367- _dropbox . AddMessage ( buffer ) ;
366+ try
367+ {
368+ var buffer = ReceiveBuffer ( cancellationToken ) ;
369+ _dropbox . AddMessage ( buffer ) ;
370+ }
371+ catch ( Exception ex )
372+ {
373+ _dropbox . AddException ( ex ) ;
374+ throw ;
375+ }
368376
369377 if ( messageTask . IsCompleted )
370378 {
@@ -426,8 +434,16 @@ private async Task<IByteBuffer> ReceiveBufferAsync(int responseTo, CancellationT
426434 receiveLockRequest . Task . GetAwaiter ( ) . GetResult ( ) ; // propagate exceptions
427435 while ( true )
428436 {
429- var buffer = await ReceiveBufferAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
430- _dropbox . AddMessage ( buffer ) ;
437+ try
438+ {
439+ var buffer = await ReceiveBufferAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
440+ _dropbox . AddMessage ( buffer ) ;
441+ }
442+ catch ( Exception ex )
443+ {
444+ _dropbox . AddException ( ex ) ;
445+ throw ;
446+ }
431447
432448 if ( messageTask . IsCompleted )
433449 {
@@ -766,6 +782,14 @@ private class Dropbox
766782 private readonly ConcurrentDictionary< int , TaskCompletionSource< IByteBuffer>> _messages = new ConcurrentDictionary < int , TaskCompletionSource < IByteBuffer > > ( ) ;
767783
768784 // public methods
785+ public void AddException ( Exception exception )
786+ {
787+ foreach ( var taskCompletionSource in _messages . Values )
788+ {
789+ taskCompletionSource . TrySetException ( exception ) ; // has no effect on already completed tasks
790+ }
791+ }
792+
769793 public void AddMessage ( IByteBuffer message )
770794 {
771795 var responseTo = GetResponseTo( message ) ;
0 commit comments