@@ -203,110 +203,106 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
203203 Assert . Equal ( queueName , q . QueueName ) ;
204204
205205 Task publishTask = Task . Run ( async ( ) =>
206+ {
207+ await using IConnection publishConn = await _connFactory . CreateConnectionAsync ( ) ;
208+ publishConn . ConnectionShutdownAsync += ( o , ea ) =>
209+ {
210+ HandleConnectionShutdown ( publishConn , ea , ( args ) =>
211+ {
212+ MaybeSetException ( args , publish1SyncSource , publish2SyncSource ) ;
213+ } ) ;
214+ return Task . CompletedTask ;
215+ } ;
216+ await using ( IChannel publishChannel = await publishConn . CreateChannelAsync ( ) )
217+ {
218+ AddCallbackExceptionHandlers ( publishConn , publishChannel ) ;
219+ publishChannel . DefaultConsumer = new DefaultAsyncConsumer ( publishChannel ,
220+ "publishChannel," , _output ) ;
221+ publishChannel . ChannelShutdownAsync += ( o , ea ) =>
206222 {
207- using ( IConnection publishConn = await _connFactory . CreateConnectionAsync ( ) )
223+ HandleChannelShutdown ( publishChannel , ea , ( args ) =>
208224 {
209- publishConn . ConnectionShutdownAsync += ( o , ea ) =>
210- {
211- HandleConnectionShutdown ( publishConn , ea , ( args ) =>
212- {
213- MaybeSetException ( args , publish1SyncSource , publish2SyncSource ) ;
214- } ) ;
215- return Task . CompletedTask ;
216- } ;
217- using ( IChannel publishChannel = await publishConn . CreateChannelAsync ( ) )
218- {
219- AddCallbackExceptionHandlers ( publishConn , publishChannel ) ;
220- publishChannel . DefaultConsumer = new DefaultAsyncConsumer ( publishChannel ,
221- "publishChannel," , _output ) ;
222- publishChannel . ChannelShutdownAsync += ( o , ea ) =>
223- {
224- HandleChannelShutdown ( publishChannel , ea , ( args ) =>
225- {
226- MaybeSetException ( args , publish1SyncSource , publish2SyncSource ) ;
227- } ) ;
228- return Task . CompletedTask ;
229- } ;
230- await publishChannel . ConfirmSelectAsync ( ) ;
231-
232- for ( int i = 0 ; i < publish_total ; i ++ )
233- {
234- await publishChannel . BasicPublishAsync ( string . Empty , queueName , body1 ) ;
235- await publishChannel . BasicPublishAsync ( string . Empty , queueName , body2 ) ;
236- await publishChannel . WaitForConfirmsOrDieAsync ( ) ;
237- }
238-
239- await publishChannel . CloseAsync ( ) ;
240- }
225+ MaybeSetException ( args , publish1SyncSource , publish2SyncSource ) ;
226+ } ) ;
227+ return Task . CompletedTask ;
228+ } ;
229+ await publishChannel . ConfirmSelectAsync ( ) ;
241230
242- await publishConn . CloseAsync ( ) ;
243- }
244- } ) ;
231+ for ( int i = 0 ; i < publish_total ; i ++ )
232+ {
233+ await publishChannel . BasicPublishAsync ( string . Empty , queueName , body1 ) ;
234+ await publishChannel . BasicPublishAsync ( string . Empty , queueName , body2 ) ;
235+ await publishChannel . WaitForConfirmsOrDieAsync ( ) ;
236+ }
237+
238+ await publishChannel . CloseAsync ( ) ;
239+ }
240+
241+ await publishConn . CloseAsync ( ) ;
242+ } ) ;
245243
246244
247245 int publish1_count = 0 ;
248246 int publish2_count = 0 ;
249247
250248 Task consumeTask = Task . Run ( async ( ) =>
249+ {
250+ await using IConnection consumeConn = await _connFactory . CreateConnectionAsync ( ) ;
251+ consumeConn . ConnectionShutdownAsync += ( o , ea ) =>
252+ {
253+ HandleConnectionShutdown ( consumeConn , ea , ( args ) =>
254+ {
255+ MaybeSetException ( ea , publish1SyncSource , publish2SyncSource ) ;
256+ } ) ;
257+ return Task . CompletedTask ;
258+ } ;
259+ await using ( IChannel consumeChannel = await consumeConn . CreateChannelAsync ( ) )
260+ {
261+ AddCallbackExceptionHandlers ( consumeConn , consumeChannel ) ;
262+ consumeChannel . DefaultConsumer = new DefaultAsyncConsumer ( consumeChannel ,
263+ "consumeChannel," , _output ) ;
264+ consumeChannel . ChannelShutdownAsync += ( o , ea ) =>
251265 {
252- using ( IConnection consumeConn = await _connFactory . CreateConnectionAsync ( ) )
266+ HandleChannelShutdown ( consumeChannel , ea , ( args ) =>
253267 {
254- consumeConn . ConnectionShutdownAsync += ( o , ea ) =>
268+ MaybeSetException ( ea , publish1SyncSource , publish2SyncSource ) ;
269+ } ) ;
270+ return Task . CompletedTask ;
271+ } ;
272+
273+ var consumer = new AsyncEventingBasicConsumer ( consumeChannel ) ;
274+ consumer . ReceivedAsync += ( o , a ) =>
275+ {
276+ if ( ByteArraysEqual ( a . Body . ToArray ( ) , body1 ) )
277+ {
278+ if ( Interlocked . Increment ( ref publish1_count ) >= publish_total )
255279 {
256- HandleConnectionShutdown ( consumeConn , ea , ( args ) =>
257- {
258- MaybeSetException ( ea , publish1SyncSource , publish2SyncSource ) ;
259- } ) ;
260- return Task . CompletedTask ;
261- } ;
262- using ( IChannel consumeChannel = await consumeConn . CreateChannelAsync ( ) )
280+ publish1SyncSource . TrySetResult ( true ) ;
281+ }
282+ }
283+ else if ( ByteArraysEqual ( a . Body . ToArray ( ) , body2 ) )
284+ {
285+ if ( Interlocked . Increment ( ref publish2_count ) >= publish_total )
263286 {
264- AddCallbackExceptionHandlers ( consumeConn , consumeChannel ) ;
265- consumeChannel . DefaultConsumer = new DefaultAsyncConsumer ( consumeChannel ,
266- "consumeChannel," , _output ) ;
267- consumeChannel . ChannelShutdownAsync += ( o , ea ) =>
268- {
269- HandleChannelShutdown ( consumeChannel , ea , ( args ) =>
270- {
271- MaybeSetException ( ea , publish1SyncSource , publish2SyncSource ) ;
272- } ) ;
273- return Task . CompletedTask ;
274- } ;
275-
276- var consumer = new AsyncEventingBasicConsumer ( consumeChannel ) ;
277- consumer . ReceivedAsync += ( o , a ) =>
278- {
279- if ( ByteArraysEqual ( a . Body . ToArray ( ) , body1 ) )
280- {
281- if ( Interlocked . Increment ( ref publish1_count ) >= publish_total )
282- {
283- publish1SyncSource . TrySetResult ( true ) ;
284- }
285- }
286- else if ( ByteArraysEqual ( a . Body . ToArray ( ) , body2 ) )
287- {
288- if ( Interlocked . Increment ( ref publish2_count ) >= publish_total )
289- {
290- publish2SyncSource . TrySetResult ( true ) ;
291- }
292- }
293- else
294- {
295- var ex = new InvalidOperationException ( "incorrect message - should never happen!" ) ;
296- SetException ( ex , publish1SyncSource , publish2SyncSource ) ;
297- }
298- return Task . CompletedTask ;
299- } ;
300-
301- await consumeChannel . BasicConsumeAsync ( queueName , true , string . Empty , false , false , null , consumer ) ;
302- await consumerSyncSource . Task ;
303-
304- await consumeChannel . CloseAsync ( ) ;
287+ publish2SyncSource . TrySetResult ( true ) ;
305288 }
306-
307- await consumeConn . CloseAsync ( ) ;
308289 }
309- } ) ;
290+ else
291+ {
292+ var ex = new InvalidOperationException ( "incorrect message - should never happen!" ) ;
293+ SetException ( ex , publish1SyncSource , publish2SyncSource ) ;
294+ }
295+ return Task . CompletedTask ;
296+ } ;
297+
298+ await consumeChannel . BasicConsumeAsync ( queueName , true , string . Empty , false , false , null , consumer ) ;
299+ await consumerSyncSource . Task ;
300+
301+ await consumeChannel . CloseAsync ( ) ;
302+ }
303+
304+ await consumeConn . CloseAsync ( ) ;
305+ } ) ;
310306
311307 try
312308 {
@@ -653,15 +649,13 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
653649 var consumer1 = new AsyncEventingBasicConsumer ( _channel ) ;
654650 consumer1 . ReceivedAsync += async ( sender , args ) =>
655651 {
656- using ( IChannel innerChannel = await _conn . CreateChannelAsync ( ) )
657- {
658- await innerChannel . ConfirmSelectAsync ( ) ;
659- await innerChannel . BasicPublishAsync ( exchangeName , queue2Name ,
660- mandatory : true ,
661- body : Encoding . ASCII . GetBytes ( nameof ( TestCreateChannelWithinAsyncConsumerCallback_GH650 ) ) ) ;
662- await innerChannel . WaitForConfirmsOrDieAsync ( ) ;
663- await innerChannel . CloseAsync ( ) ;
664- }
652+ await using IChannel innerChannel = await _conn . CreateChannelAsync ( ) ;
653+ await innerChannel . ConfirmSelectAsync ( ) ;
654+ await innerChannel . BasicPublishAsync ( exchangeName , queue2Name ,
655+ mandatory : true ,
656+ body : Encoding . ASCII . GetBytes ( nameof ( TestCreateChannelWithinAsyncConsumerCallback_GH650 ) ) ) ;
657+ await innerChannel . WaitForConfirmsOrDieAsync ( ) ;
658+ await innerChannel . CloseAsync ( ) ;
665659 } ;
666660 await _channel . BasicConsumeAsync ( queue1Name , autoAck : true , consumer1 ) ;
667661
@@ -720,12 +714,10 @@ private async Task ValidateConsumerDispatchConcurrency()
720714 AutorecoveringChannel autorecoveringChannel = ( AutorecoveringChannel ) _channel ;
721715 Assert . Equal ( ConsumerDispatchConcurrency , autorecoveringChannel . ConsumerDispatcher . Concurrency ) ;
722716 Assert . Equal ( _consumerDispatchConcurrency , autorecoveringChannel . ConsumerDispatcher . Concurrency ) ;
723- using ( IChannel ch = await _conn . CreateChannelAsync (
724- consumerDispatchConcurrency : expectedConsumerDispatchConcurrency ) )
725- {
726- AutorecoveringChannel ach = ( AutorecoveringChannel ) ch ;
727- Assert . Equal ( expectedConsumerDispatchConcurrency , ach . ConsumerDispatcher . Concurrency ) ;
728- }
717+ await using IChannel ch = await _conn . CreateChannelAsync (
718+ consumerDispatchConcurrency : expectedConsumerDispatchConcurrency ) ;
719+ AutorecoveringChannel ach = ( AutorecoveringChannel ) ch ;
720+ Assert . Equal ( expectedConsumerDispatchConcurrency , ach . ConsumerDispatcher . Concurrency ) ;
729721 }
730722
731723 private static void SetException ( Exception ex , params TaskCompletionSource < bool > [ ] tcsAry )
0 commit comments