3535using System . Diagnostics ;
3636using System . Runtime . CompilerServices ;
3737using System . Threading ;
38+ using System . Threading . RateLimiting ;
3839using System . Threading . Tasks ;
3940using RabbitMQ . Client . Events ;
4041using RabbitMQ . Client . Exceptions ;
@@ -47,32 +48,26 @@ internal partial class Channel : IChannel, IRecoverable
4748 {
4849 private bool _publisherConfirmationsEnabled = false ;
4950 private bool _publisherConfirmationTrackingEnabled = false ;
50- private ushort ? _maxOutstandingPublisherConfirmations = null ;
51- private SemaphoreSlim ? _maxOutstandingConfirmationsSemaphore ;
5251 private ulong _nextPublishSeqNo = 0 ;
5352 private readonly SemaphoreSlim _confirmSemaphore = new ( 1 , 1 ) ;
5453 private readonly ConcurrentDictionary < ulong , TaskCompletionSource < bool > > _confirmsTaskCompletionSources = new ( ) ;
54+ private RateLimiter ? _outstandingPublisherConfirmationsRateLimiter ;
5555
56- private class PublisherConfirmationInfo
56+ private sealed class PublisherConfirmationInfo : IDisposable
5757 {
58- private ulong _publishSequenceNumber ;
5958 private TaskCompletionSource < bool > ? _publisherConfirmationTcs ;
59+ private readonly RateLimitLease ? _lease ;
6060
61- internal PublisherConfirmationInfo ( )
61+ internal PublisherConfirmationInfo ( ulong publishSequenceNumber ,
62+ TaskCompletionSource < bool > ? publisherConfirmationTcs ,
63+ RateLimitLease ? lease )
6264 {
63- _publishSequenceNumber = 0 ;
64- _publisherConfirmationTcs = null ;
65- }
66-
67- internal PublisherConfirmationInfo ( ulong publishSequenceNumber , TaskCompletionSource < bool > ? publisherConfirmationTcs )
68- {
69- _publishSequenceNumber = publishSequenceNumber ;
65+ PublishSequenceNumber = publishSequenceNumber ;
7066 _publisherConfirmationTcs = publisherConfirmationTcs ;
67+ _lease = lease ;
7168 }
7269
73- internal ulong PublishSequenceNumber => _publishSequenceNumber ;
74-
75- internal TaskCompletionSource < bool > ? PublisherConfirmationTcs => _publisherConfirmationTcs ;
70+ internal ulong PublishSequenceNumber { get ; }
7671
7772 internal async Task MaybeWaitForConfirmationAsync ( CancellationToken cancellationToken )
7873 {
@@ -95,6 +90,11 @@ internal bool MaybeHandleException(Exception ex)
9590
9691 return exceptionWasHandled ;
9792 }
93+
94+ public void Dispose ( )
95+ {
96+ _lease ? . Dispose ( ) ;
97+ }
9898 }
9999
100100 public async ValueTask < ulong > GetNextPublishSequenceNumberAsync ( CancellationToken cancellationToken = default )
@@ -119,18 +119,11 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke
119119
120120 private void ConfigurePublisherConfirmations ( bool publisherConfirmationsEnabled ,
121121 bool publisherConfirmationTrackingEnabled ,
122- ushort ? maxOutstandingPublisherConfirmations )
122+ RateLimiter ? outstandingPublisherConfirmationsRateLimiter )
123123 {
124124 _publisherConfirmationsEnabled = publisherConfirmationsEnabled ;
125125 _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled ;
126- _maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations ;
127-
128- if ( _publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null )
129- {
130- _maxOutstandingConfirmationsSemaphore = new SemaphoreSlim (
131- ( int ) _maxOutstandingPublisherConfirmations ,
132- ( int ) _maxOutstandingPublisherConfirmations ) ;
133- }
126+ _outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter ;
134127 }
135128
136129 private async Task MaybeConfirmSelect ( CancellationToken cancellationToken )
@@ -282,11 +275,15 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
282275 {
283276 if ( _publisherConfirmationsEnabled )
284277 {
285- if ( _publisherConfirmationTrackingEnabled &&
286- _maxOutstandingConfirmationsSemaphore is not null )
278+ RateLimitLease ? lease = null ;
279+ if ( _publisherConfirmationTrackingEnabled )
287280 {
288- await _maxOutstandingConfirmationsSemaphore . WaitAsync ( cancellationToken )
289- . ConfigureAwait ( false ) ;
281+ if ( _outstandingPublisherConfirmationsRateLimiter is not null )
282+ {
283+ lease = await _outstandingPublisherConfirmationsRateLimiter . AcquireAsync (
284+ cancellationToken : cancellationToken )
285+ . ConfigureAwait ( false ) ;
286+ }
290287 }
291288
292289 await _confirmSemaphore . WaitAsync ( cancellationToken )
@@ -303,7 +300,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
303300
304301 _nextPublishSeqNo ++ ;
305302
306- return new PublisherConfirmationInfo ( publishSequenceNumber , publisherConfirmationTcs ) ;
303+ return new PublisherConfirmationInfo ( publishSequenceNumber , publisherConfirmationTcs , lease ) ;
307304 }
308305 else
309306 {
@@ -339,18 +336,19 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
339336 {
340337 if ( _publisherConfirmationsEnabled )
341338 {
342- if ( _publisherConfirmationTrackingEnabled &&
343- _maxOutstandingConfirmationsSemaphore is not null )
344- {
345- _maxOutstandingConfirmationsSemaphore . Release ( ) ;
346- }
347-
348339 _confirmSemaphore . Release ( ) ;
349340
350341 if ( publisherConfirmationInfo is not null )
351342 {
352- await publisherConfirmationInfo . MaybeWaitForConfirmationAsync ( cancellationToken )
353- . ConfigureAwait ( false ) ;
343+ try
344+ {
345+ await publisherConfirmationInfo . MaybeWaitForConfirmationAsync ( cancellationToken )
346+ . ConfigureAwait ( false ) ;
347+ }
348+ finally
349+ {
350+ publisherConfirmationInfo . Dispose ( ) ;
351+ }
354352 }
355353 }
356354 }
0 commit comments