3030//---------------------------------------------------------------------------
3131
3232using System ;
33- using System . Buffers . Binary ;
3433using System . Collections . Generic ;
3534using System . Diagnostics ;
3635using System . Text ;
3736using System . Threading ;
3837using System . Threading . Tasks ;
3938using RabbitMQ . Client ;
39+ using RabbitMQ . Client . Exceptions ;
4040
4141const ushort MAX_OUTSTANDING_CONFIRMS = 256 ;
4242
@@ -124,11 +124,33 @@ async Task PublishMessagesInBatchAsync()
124124 var sw = new Stopwatch ( ) ;
125125 sw . Start ( ) ;
126126
127+ channel . BasicReturnAsync += ( sender , ea ) =>
128+ {
129+ ulong sequenceNumber = 0 ;
130+
131+ IReadOnlyBasicProperties props = ea . BasicProperties ;
132+ if ( props . Headers is not null )
133+ {
134+ object ? maybeSeqNum = props . Headers [ Constants . PublishSequenceNumberHeader ] ;
135+ if ( maybeSeqNum is long longSequenceNumber )
136+ {
137+ sequenceNumber = ( ulong ) longSequenceNumber ;
138+ }
139+ }
140+
141+ return Console . Out . WriteLineAsync ( $ "{ DateTime . Now } [INFO] message sequence number '{ sequenceNumber } ' has been basic.return-ed") ;
142+ } ;
143+
127144 var publishTasks = new List < ValueTask > ( ) ;
128145 for ( int i = 0 ; i < MESSAGE_COUNT ; i ++ )
129146 {
147+ string rk = queueName ;
148+ if ( i % 1000 == 0 )
149+ {
150+ rk = Guid . NewGuid ( ) . ToString ( ) ;
151+ }
130152 byte [ ] body = Encoding . UTF8 . GetBytes ( i . ToString ( ) ) ;
131- publishTasks . Add ( channel . BasicPublishAsync ( exchange : string . Empty , routingKey : queueName , body : body , mandatory : true , basicProperties : props ) ) ;
153+ publishTasks . Add ( channel . BasicPublishAsync ( exchange : string . Empty , routingKey : rk , body : body , mandatory : true , basicProperties : props ) ) ;
132154 outstandingMessageCount ++ ;
133155
134156 if ( outstandingMessageCount == batchSize )
@@ -139,9 +161,13 @@ async Task PublishMessagesInBatchAsync()
139161 {
140162 await pt ;
141163 }
164+ catch ( PublishException pex )
165+ {
166+ Console . Error . WriteLine ( $ "{ DateTime . Now } [ERROR] saw nack or return, pex.IsReturn: '{ pex . IsReturn } ', seq no: '{ pex . PublishSequenceNumber } '") ;
167+ }
142168 catch ( Exception ex )
143169 {
144- Console . Error . WriteLine ( $ "{ DateTime . Now } [ERROR] saw nack or return , ex: '{ ex } '") ;
170+ Console . Error . WriteLine ( $ "{ DateTime . Now } [ERROR] saw exception , ex: '{ ex } '") ;
145171 }
146172 }
147173 publishTasks . Clear ( ) ;
@@ -157,9 +183,13 @@ async Task PublishMessagesInBatchAsync()
157183 {
158184 await pt ;
159185 }
186+ catch ( PublishException pex )
187+ {
188+ Console . Error . WriteLine ( $ "{ DateTime . Now } [ERROR] saw nack or return, pex.IsReturn: '{ pex . IsReturn } ', seq no: '{ pex . PublishSequenceNumber } '") ;
189+ }
160190 catch ( Exception ex )
161191 {
162- Console . Error . WriteLine ( $ "{ DateTime . Now } [ERROR] saw nack or return , ex: '{ ex } '") ;
192+ Console . Error . WriteLine ( $ "{ DateTime . Now } [ERROR] saw exception , ex: '{ ex } '") ;
163193 }
164194 }
165195 publishTasks . Clear ( ) ;
@@ -236,22 +266,23 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
236266 }
237267 }
238268
239- channel . BasicReturnAsync += ( sender , ea ) =>
269+ channel . BasicReturnAsync += async ( sender , ea ) =>
240270 {
241271 ulong sequenceNumber = 0 ;
242272
243273 IReadOnlyBasicProperties props = ea . BasicProperties ;
244274 if ( props . Headers is not null )
245275 {
246276 object ? maybeSeqNum = props . Headers [ Constants . PublishSequenceNumberHeader ] ;
247- if ( maybeSeqNum is not null )
277+ if ( maybeSeqNum is long longSequenceNumber )
248278 {
249- sequenceNumber = BinaryPrimitives . ReadUInt64BigEndian ( ( byte [ ] ) maybeSeqNum ) ;
279+ sequenceNumber = ( ulong ) longSequenceNumber ;
250280 }
251281 }
252282
253- Console . WriteLine ( $ "{ DateTime . Now } [WARNING] message sequence number { sequenceNumber } has been basic.return-ed") ;
254- return CleanOutstandingConfirms ( sequenceNumber , false ) ;
283+ await Console . Out . WriteLineAsync ( $ "{ DateTime . Now } [INFO] message sequence number '{ sequenceNumber } ' has been basic.return-ed") ;
284+
285+ await CleanOutstandingConfirms ( sequenceNumber , false ) ;
255286 } ;
256287
257288 channel . BasicAcksAsync += ( sender , ea ) => CleanOutstandingConfirms ( ea . DeliveryTag , ea . Multiple ) ;
@@ -290,13 +321,21 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
290321 // This will cause a basic.return, for fun
291322 rk = Guid . NewGuid ( ) . ToString ( ) ;
292323 }
324+
325+ var msgProps = new BasicProperties
326+ {
327+ Persistent = true ,
328+ Headers = new Dictionary < string , object ? > ( )
329+ } ;
330+
331+ msgProps . Headers . Add ( Constants . PublishSequenceNumberHeader , ( long ) nextPublishSeqNo ) ;
332+
293333 ( ulong , ValueTask ) data =
294- ( nextPublishSeqNo , channel . BasicPublishAsync ( exchange : string . Empty , routingKey : rk , body : body , mandatory : true , basicProperties : props ) ) ;
334+ ( nextPublishSeqNo , channel . BasicPublishAsync ( exchange : string . Empty , routingKey : rk , body : body , mandatory : true , basicProperties : msgProps ) ) ;
295335 publishTasks . Add ( data ) ;
296336 }
297337
298338 using var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 10 ) ) ;
299- // await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
300339 foreach ( ( ulong SeqNo , ValueTask PublishTask ) datum in publishTasks )
301340 {
302341 try
0 commit comments