@@ -55,7 +55,6 @@ internal partial class Channel : IChannel, IRecoverable
5555 // AMQP only allows one RPC operation to be active at a time.
5656 protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim ( 1 , 1 ) ;
5757 private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue ( ) ;
58- private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent ( true ) ;
5958
6059 private ShutdownEventArgs ? _closeReason ;
6160 public ShutdownEventArgs ? CloseReason => Volatile . Read ( ref _closeReason ) ;
@@ -361,11 +360,14 @@ protected bool Enqueue(IRpcContinuation k)
361360 }
362361 }
363362
364- internal async Task < IChannel > OpenAsync ( bool publisherConfirmationsEnabled = false ,
365- bool publisherConfirmationTrackingEnabled = false ,
366- CancellationToken cancellationToken = default )
363+ internal async Task < IChannel > OpenAsync ( bool publisherConfirmationsEnabled ,
364+ bool publisherConfirmationTrackingEnabled ,
365+ ushort ? maxOutstandingPublisherConfirmations ,
366+ CancellationToken cancellationToken )
367367 {
368- ConfigurePublisherConfirmations ( publisherConfirmationsEnabled , publisherConfirmationTrackingEnabled ) ;
368+ ConfigurePublisherConfirmations ( publisherConfirmationsEnabled ,
369+ publisherConfirmationTrackingEnabled ,
370+ maxOutstandingPublisherConfirmations ) ;
369371
370372 bool enqueued = false ;
371373 var k = new ChannelOpenAsyncRpcContinuation ( ContinuationTimeout , cancellationToken ) ;
@@ -450,17 +452,6 @@ protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THead
450452 return Session . TransmitAsync ( in method , in header , body , cancellationToken ) ;
451453 }
452454
453- [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
454- protected ValueTask EnforceFlowControlAsync ( CancellationToken cancellationToken )
455- {
456- if ( _flowControlBlock . IsSet )
457- {
458- return default ;
459- }
460-
461- return _flowControlBlock . WaitAsync ( cancellationToken ) ;
462- }
463-
464455 internal Task OnCallbackExceptionAsync ( CallbackExceptionEventArgs args )
465456 {
466457 return _callbackExceptionAsyncWrapper . InvokeAsync ( this , args ) ;
@@ -540,7 +531,8 @@ protected virtual void Dispose(bool disposing)
540531
541532 ConsumerDispatcher . Dispose ( ) ;
542533 _rpcSemaphore . Dispose ( ) ;
543- _confirmSemaphore ? . Dispose ( ) ;
534+ _confirmSemaphore . Dispose ( ) ;
535+ _maxOutstandingConfirmationsSemaphore ? . Dispose ( ) ;
544536 }
545537 }
546538
@@ -561,7 +553,8 @@ protected virtual async ValueTask DisposeAsyncCore()
561553
562554 ConsumerDispatcher . Dispose ( ) ;
563555 _rpcSemaphore . Dispose ( ) ;
564- _confirmSemaphore ? . Dispose ( ) ;
556+ _confirmSemaphore . Dispose ( ) ;
557+ _maxOutstandingConfirmationsSemaphore ? . Dispose ( ) ;
565558 }
566559
567560 public Task ConnectionTuneOkAsync ( ushort channelMax , uint frameMax , ushort heartbeat , CancellationToken cancellationToken )
0 commit comments