@@ -44,7 +44,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
4444 private readonly System . Threading . Channels . ChannelWriter < WorkStruct > _writer ;
4545 private readonly Task _worker ;
4646 private readonly ushort _concurrency ;
47- private bool _quiesce = false ;
47+ private long _isQuiescing ;
4848 private bool _disposed ;
4949
5050 internal ConsumerDispatcherChannelBase ( Impl . Channel channel , ushort concurrency )
@@ -79,15 +79,15 @@ internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
7979 }
8080 }
8181
82- public bool IsShutdown => _quiesce ;
82+ public bool IsShutdown => IsQuiescing ;
8383
8484 public ushort Concurrency => _concurrency ;
8585
8686 public async ValueTask HandleBasicConsumeOkAsync ( IAsyncBasicConsumer consumer , string consumerTag , CancellationToken cancellationToken )
8787 {
8888 cancellationToken . ThrowIfCancellationRequested ( ) ;
8989
90- if ( false == _disposed && false == _quiesce )
90+ if ( false == _disposed && false == IsQuiescing )
9191 {
9292 try
9393 {
@@ -110,7 +110,7 @@ public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliver
110110 {
111111 cancellationToken . ThrowIfCancellationRequested ( ) ;
112112
113- if ( false == _disposed && false == _quiesce )
113+ if ( false == _disposed && false == IsQuiescing )
114114 {
115115 IAsyncBasicConsumer consumer = GetConsumerOrDefault ( consumerTag ) ;
116116 var work = WorkStruct . CreateDeliver ( consumer , consumerTag , deliveryTag , redelivered , exchange , routingKey , basicProperties , body ) ;
@@ -123,7 +123,7 @@ public async ValueTask HandleBasicCancelOkAsync(string consumerTag, Cancellation
123123 {
124124 cancellationToken . ThrowIfCancellationRequested ( ) ;
125125
126- if ( false == _disposed && false == _quiesce )
126+ if ( false == _disposed && false == IsQuiescing )
127127 {
128128 IAsyncBasicConsumer consumer = GetAndRemoveConsumer ( consumerTag ) ;
129129 WorkStruct work = WorkStruct . CreateCancelOk ( consumer , consumerTag ) ;
@@ -136,7 +136,7 @@ public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationTo
136136 {
137137 cancellationToken . ThrowIfCancellationRequested ( ) ;
138138
139- if ( false == _disposed && false == _quiesce )
139+ if ( false == _disposed && false == IsQuiescing )
140140 {
141141 IAsyncBasicConsumer consumer = GetAndRemoveConsumer ( consumerTag ) ;
142142 WorkStruct work = WorkStruct . CreateCancel ( consumer , consumerTag ) ;
@@ -147,7 +147,7 @@ await _writer.WriteAsync(work, cancellationToken)
147147
148148 public void Quiesce ( )
149149 {
150- _quiesce = true ;
150+ Interlocked . Exchange ( ref _isQuiescing , 1 ) ;
151151 }
152152
153153 public async Task WaitForShutdownAsync ( )
@@ -157,7 +157,7 @@ public async Task WaitForShutdownAsync()
157157 return ;
158158 }
159159
160- if ( _quiesce )
160+ if ( IsQuiescing )
161161 {
162162 try
163163 {
@@ -193,6 +193,19 @@ await _worker
193193 }
194194 }
195195
196+ protected bool IsQuiescing
197+ {
198+ get
199+ {
200+ if ( Interlocked . Read ( ref _isQuiescing ) == 1 )
201+ {
202+ return true ;
203+ }
204+
205+ return false ;
206+ }
207+ }
208+
196209 protected sealed override void ShutdownConsumer ( IAsyncBasicConsumer consumer , ShutdownEventArgs reason )
197210 {
198211 _writer . TryWrite ( WorkStruct . CreateShutdown ( consumer , reason ) ) ;
0 commit comments