@@ -16,45 +16,55 @@ internal ConsumerDispatcher(ChannelBase channel, int concurrency)
1616
1717 protected override async Task ProcessChannelAsync ( CancellationToken token )
1818 {
19- while ( await _reader . WaitToReadAsync ( token ) . ConfigureAwait ( false ) )
19+ try
2020 {
21- while ( _reader . TryRead ( out var work ) )
21+ while ( await _reader . WaitToReadAsync ( token ) . ConfigureAwait ( false ) )
2222 {
23- using ( work )
23+ while ( _reader . TryRead ( out WorkStruct work ) )
2424 {
25- try
25+ using ( work )
2626 {
27- IBasicConsumer consumer = work . Consumer ;
28- string ? consumerTag = work . ConsumerTag ;
29- switch ( work . WorkType )
27+ try
3028 {
31- case WorkType . Deliver :
32- await consumer . HandleBasicDeliverAsync (
33- consumerTag , work . DeliveryTag , work . Redelivered ,
34- work . Exchange , work . RoutingKey , work . BasicProperties , work . Body . Memory )
35- . ConfigureAwait ( false ) ;
36- break ;
37- case WorkType . Cancel :
38- consumer . HandleBasicCancel ( consumerTag ) ;
39- break ;
40- case WorkType . CancelOk :
41- consumer . HandleBasicCancelOk ( consumerTag ) ;
42- break ;
43- case WorkType . ConsumeOk :
44- consumer . HandleBasicConsumeOk ( consumerTag ) ;
45- break ;
46- case WorkType . Shutdown :
47- consumer . HandleChannelShutdown ( _channel , work . Reason ) ;
48- break ;
29+ IBasicConsumer consumer = work . Consumer ;
30+ string ? consumerTag = work . ConsumerTag ;
31+ switch ( work . WorkType )
32+ {
33+ case WorkType . Deliver :
34+ await consumer . HandleBasicDeliverAsync (
35+ consumerTag , work . DeliveryTag , work . Redelivered ,
36+ work . Exchange , work . RoutingKey , work . BasicProperties , work . Body . Memory )
37+ . ConfigureAwait ( false ) ;
38+ break ;
39+ case WorkType . Cancel :
40+ consumer . HandleBasicCancel ( consumerTag ) ;
41+ break ;
42+ case WorkType . CancelOk :
43+ consumer . HandleBasicCancelOk ( consumerTag ) ;
44+ break ;
45+ case WorkType . ConsumeOk :
46+ consumer . HandleBasicConsumeOk ( consumerTag ) ;
47+ break ;
48+ case WorkType . Shutdown :
49+ consumer . HandleChannelShutdown ( _channel , work . Reason ) ;
50+ break ;
51+ }
52+ }
53+ catch ( Exception e )
54+ {
55+ _channel . OnCallbackException ( CallbackExceptionEventArgs . Build ( e , work . WorkType . ToString ( ) , work . Consumer ) ) ;
4956 }
50- }
51- catch ( Exception e )
52- {
53- _channel . OnCallbackException ( CallbackExceptionEventArgs . Build ( e , work . WorkType . ToString ( ) , work . Consumer ) ) ;
5457 }
5558 }
5659 }
5760 }
61+ catch ( OperationCanceledException )
62+ {
63+ if ( false == token . IsCancellationRequested )
64+ {
65+ throw ;
66+ }
67+ }
5868 }
5969 }
6070}
0 commit comments