88
99namespace RabbitMQ . Client
1010{
11- public class AsyncDefaultBasicConsumer : IBasicConsumer , IAsyncBasicConsumer
11+ public class AsyncDefaultBasicConsumer : IAsyncBasicConsumer
1212 {
1313 private readonly HashSet < string > _consumerTags = new HashSet < string > ( ) ;
1414
1515 /// <summary>
16- /// Creates a new instance of an <see cref="DefaultBasicConsumer "/>.
16+ /// Creates a new instance of an <see cref="AsyncDefaultBasicConsumer "/>.
1717 /// </summary>
1818 public AsyncDefaultBasicConsumer ( )
1919 {
@@ -70,28 +70,31 @@ public event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled
7070 /// <summary>
7171 /// Called when the consumer is cancelled for reasons other than by a basicCancel:
7272 /// e.g. the queue has been deleted (either by this channel or by any other channel).
73- /// See <see cref="HandleBasicCancelOk "/> for notification of consumer cancellation due to basicCancel
73+ /// See <see cref="HandleBasicCancelOkAsync "/> for notification of consumer cancellation due to basicCancel
7474 /// </summary>
7575 /// <param name="consumerTag">Consumer tag this consumer is registered.</param>
76- public virtual Task HandleBasicCancel ( string consumerTag )
76+ /// <param name="cancellationToken">The cancellation token for this operation.</param>
77+ public virtual Task HandleBasicCancelAsync ( string consumerTag , CancellationToken cancellationToken )
7778 {
78- return OnCancel ( consumerTag ) ;
79+ return OnCancelAsync ( cancellationToken , consumerTag ) ;
7980 }
8081
8182 /// <summary>
8283 /// Called upon successful deregistration of the consumer from the broker.
8384 /// </summary>
8485 /// <param name="consumerTag">Consumer tag this consumer is registered.</param>
85- public virtual Task HandleBasicCancelOk ( string consumerTag )
86+ /// <param name="cancellationToken">The cancellation token for this operation.</param>
87+ public virtual Task HandleBasicCancelOkAsync ( string consumerTag , CancellationToken cancellationToken )
8688 {
87- return OnCancel ( consumerTag ) ;
89+ return OnCancelAsync ( cancellationToken , consumerTag ) ;
8890 }
8991
9092 /// <summary>
9193 /// Called upon successful registration of the consumer with the broker.
9294 /// </summary>
9395 /// <param name="consumerTag">Consumer tag this consumer is registered.</param>
94- public virtual Task HandleBasicConsumeOk ( string consumerTag )
96+ /// <param name="cancellationToken">The cancellation token for this operation.</param>
97+ public virtual Task HandleBasicConsumeOkAsync ( string consumerTag , CancellationToken cancellationToken )
9598 {
9699 _consumerTags . Add ( consumerTag ) ;
97100 IsRunning = true ;
@@ -108,13 +111,14 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
108111 /// Accessing the body at a later point is unsafe as its memory can
109112 /// be already released.
110113 /// </remarks>
111- public virtual Task HandleBasicDeliver ( string consumerTag ,
114+ public virtual Task HandleBasicDeliverAsync ( string consumerTag ,
112115 ulong deliveryTag ,
113116 bool redelivered ,
114117 string exchange ,
115118 string routingKey ,
116- in ReadOnlyBasicProperties properties ,
117- ReadOnlyMemory < byte > body )
119+ ReadOnlyBasicProperties properties ,
120+ ReadOnlyMemory < byte > body ,
121+ CancellationToken cancellationToken )
118122 {
119123 // Nothing to do here.
120124 return Task . CompletedTask ;
@@ -125,63 +129,33 @@ public virtual Task HandleBasicDeliver(string consumerTag,
125129 /// </summary>
126130 /// <param name="channel">A channel this consumer was registered on.</param>
127131 /// <param name="reason">Shutdown context.</param>
128- public virtual Task HandleChannelShutdown ( object channel , ShutdownEventArgs reason )
132+ /// <param name="cancellationToken">The cancellation token for this operation.</param>
133+ public virtual Task HandleChannelShutdownAsync ( object channel , ShutdownEventArgs reason ,
134+ CancellationToken cancellationToken )
129135 {
130136 ShutdownReason = reason ;
131- return OnCancel ( _consumerTags . ToArray ( ) ) ;
137+ return OnCancelAsync ( cancellationToken , _consumerTags . ToArray ( ) ) ;
132138 }
133139
134140 /// <summary>
135141 /// Default implementation - overridable in subclasses.</summary>
136- /// <param name="consumerTags">The set of consumer tags that where cancelled</param>
137142 /// <remarks>
143+ /// <param name="cancellationToken">The cancellation token for this operation.</param>
144+ /// <param name="consumerTags">The set of consumer tags that where cancelled</param>
138145 /// This default implementation simply sets the <see cref="IsRunning"/> property to false, and takes no further action.
139146 /// </remarks>
140- public virtual async Task OnCancel ( params string [ ] consumerTags )
147+ public virtual async Task OnCancelAsync ( CancellationToken cancellationToken , params string [ ] consumerTags )
141148 {
142149 IsRunning = false ;
143150 if ( ! _consumerCancelledWrapper . IsEmpty )
144151 {
145- // TODO cancellation token
146- await _consumerCancelledWrapper . InvokeAsync ( this , new ConsumerEventArgs ( consumerTags ) , CancellationToken . None )
152+ await _consumerCancelledWrapper . InvokeAsync ( this , new ConsumerEventArgs ( consumerTags ) , cancellationToken )
147153 . ConfigureAwait ( false ) ;
148154 }
149155 foreach ( string consumerTag in consumerTags )
150156 {
151157 _consumerTags . Remove ( consumerTag ) ;
152158 }
153159 }
154-
155- event EventHandler < ConsumerEventArgs > IBasicConsumer . ConsumerCancelled
156- {
157- add { throw new InvalidOperationException ( "Should never be called. Enable 'DispatchConsumersAsync'." ) ; }
158- remove { throw new InvalidOperationException ( "Should never be called. Enable 'DispatchConsumersAsync'." ) ; }
159- }
160-
161- void IBasicConsumer . HandleBasicCancelOk ( string consumerTag )
162- {
163- throw new InvalidOperationException ( "Should never be called. Enable 'DispatchConsumersAsync'." ) ;
164- }
165-
166- void IBasicConsumer . HandleBasicConsumeOk ( string consumerTag )
167- {
168- throw new InvalidOperationException ( "Should never be called. Enable 'DispatchConsumersAsync'." ) ;
169- }
170-
171- Task IBasicConsumer . HandleBasicDeliverAsync ( string consumerTag , ulong deliveryTag , bool redelivered , string exchange , string routingKey ,
172- ReadOnlyBasicProperties properties , ReadOnlyMemory < byte > body )
173- {
174- throw new InvalidOperationException ( "Should never be called. Enable 'DispatchConsumersAsync'." ) ;
175- }
176-
177- void IBasicConsumer . HandleChannelShutdown ( object channel , ShutdownEventArgs reason )
178- {
179- throw new InvalidOperationException ( "Should never be called. Enable 'DispatchConsumersAsync'." ) ;
180- }
181-
182- void IBasicConsumer . HandleBasicCancel ( string consumerTag )
183- {
184- throw new InvalidOperationException ( "Should never be called. Enable 'DispatchConsumersAsync'." ) ;
185- }
186160 }
187161}
0 commit comments