File tree Expand file tree Collapse file tree 2 files changed +35
-43
lines changed
projects/RabbitMQ.Client/Impl Expand file tree Collapse file tree 2 files changed +35
-43
lines changed Original file line number Diff line number Diff line change 3030//---------------------------------------------------------------------------
3131
3232using System . Collections . Concurrent ;
33+ using System . Diagnostics ;
3334using System . Threading ;
3435using System . Threading . Tasks ;
36+ using RabbitMQ . Client . Framing ;
3537
3638namespace RabbitMQ . Client . Impl
3739{
@@ -55,8 +57,39 @@ private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
5557 {
5658 if ( _publisherConfirmationsEnabled )
5759 {
58- await ConfirmSelectAsync ( _publisherConfirmationTrackingEnabled , cancellationToken )
59- . ConfigureAwait ( false ) ;
60+ // NOTE: _rpcSemaphore is held
61+ bool enqueued = false ;
62+ var k = new ConfirmSelectAsyncRpcContinuation ( ContinuationTimeout , cancellationToken ) ;
63+
64+ try
65+ {
66+ if ( _nextPublishSeqNo == 0UL )
67+ {
68+ if ( _publisherConfirmationTrackingEnabled )
69+ {
70+ _confirmsTaskCompletionSources . Clear ( ) ;
71+ }
72+ _nextPublishSeqNo = 1 ;
73+ }
74+
75+ enqueued = Enqueue ( k ) ;
76+
77+ var method = new ConfirmSelect ( false ) ;
78+ await ModelSendAsync ( in method , k . CancellationToken )
79+ . ConfigureAwait ( false ) ;
80+
81+ bool result = await k ;
82+ Debug . Assert ( result ) ;
83+
84+ return ;
85+ }
86+ finally
87+ {
88+ if ( false == enqueued )
89+ {
90+ k . Dispose ( ) ;
91+ }
92+ }
6093 }
6194 }
6295 }
Original file line number Diff line number Diff line change @@ -1716,47 +1716,6 @@ await ModelSendAsync(in method, k.CancellationToken)
17161716 }
17171717 }
17181718
1719- // NOTE: _rpcSemaphore is held
1720- private async Task ConfirmSelectAsync ( bool publisherConfirmationTrackingEnablefd = false ,
1721- CancellationToken cancellationToken = default )
1722- {
1723- _publisherConfirmationsEnabled = true ;
1724- _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnablefd ;
1725-
1726- bool enqueued = false ;
1727- var k = new ConfirmSelectAsyncRpcContinuation ( ContinuationTimeout , cancellationToken ) ;
1728-
1729- try
1730- {
1731- if ( _nextPublishSeqNo == 0UL )
1732- {
1733- if ( _publisherConfirmationTrackingEnabled )
1734- {
1735- _confirmsTaskCompletionSources . Clear ( ) ;
1736- }
1737- _nextPublishSeqNo = 1 ;
1738- }
1739-
1740- enqueued = Enqueue ( k ) ;
1741-
1742- var method = new ConfirmSelect ( false ) ;
1743- await ModelSendAsync ( in method , k . CancellationToken )
1744- . ConfigureAwait ( false ) ;
1745-
1746- bool result = await k ;
1747- Debug . Assert ( result ) ;
1748-
1749- return ;
1750- }
1751- finally
1752- {
1753- if ( false == enqueued )
1754- {
1755- k . Dispose ( ) ;
1756- }
1757- }
1758- }
1759-
17601719 private Task HandleAck ( ulong deliveryTag , bool multiple , CancellationToken cancellationToken = default )
17611720 {
17621721 if ( _publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && ! _confirmsTaskCompletionSources . IsEmpty )
You can’t perform that action at this time.
0 commit comments