@@ -99,7 +99,7 @@ public async Task TestBasicRoundtripConcurrent()
9999 } ) ;
100100 } ;
101101
102- consumer . Received += ( o , a ) =>
102+ consumer . Received += async ( o , a ) =>
103103 {
104104 string decoded = _encoding . GetString ( a . Body . ToArray ( ) ) ;
105105 if ( decoded == publish1 )
@@ -115,10 +115,13 @@ public async Task TestBasicRoundtripConcurrent()
115115 var ex = new InvalidOperationException ( "incorrect message - should never happen!" ) ;
116116 SetException ( ex , publish1SyncSource , publish2SyncSource ) ;
117117 }
118- return Task . CompletedTask ;
118+
119+ AsyncEventingBasicConsumer cons = ( AsyncEventingBasicConsumer ) o ;
120+ await cons . Channel . BasicAckAsync ( a . DeliveryTag , false ) ;
119121 } ;
120122
121- await _channel . BasicConsumeAsync ( qname , true , ConsumerTag . Empty , false , false , null , consumer ) ;
123+ await _channel . BasicQosAsync ( 0 , 1 , false ) ;
124+ await _channel . BasicConsumeAsync ( qname , autoAck : false , ConsumerTag . Empty , false , false , null , consumer ) ;
122125
123126 // ensure we get a delivery
124127 await AssertRanToCompletion ( publish1SyncSource . Task , publish2SyncSource . Task ) ;
@@ -216,8 +219,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
216219
217220 for ( int i = 0 ; i < publish_total ; i ++ )
218221 {
219- await publishChannel . BasicPublishAsync ( ExchangeName . Empty , ( RoutingKey ) queueName , body1 ) ;
220- await publishChannel . BasicPublishAsync ( ExchangeName . Empty , ( RoutingKey ) queueName , body2 ) ;
222+ await publishChannel . BasicPublishAsync ( ExchangeName . Empty , ( RoutingKey ) queueName , body1 , mandatory : true ) ;
223+ await publishChannel . BasicPublishAsync ( ExchangeName . Empty , ( RoutingKey ) queueName , body2 , mandatory : true ) ;
221224 await publishChannel . WaitForConfirmsOrDieAsync ( ) ;
222225 }
223226
@@ -260,7 +263,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
260263 int publish1_count = 0 ;
261264 int publish2_count = 0 ;
262265
263- consumer . Received += ( o , a ) =>
266+ consumer . Received += async ( o , a ) =>
264267 {
265268 string decoded = _encoding . GetString ( a . Body . ToArray ( ) ) ;
266269 if ( decoded == publish1 )
@@ -282,10 +285,13 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
282285 var ex = new InvalidOperationException ( "incorrect message - should never happen!" ) ;
283286 SetException ( ex , publish1SyncSource , publish2SyncSource ) ;
284287 }
285- return Task . CompletedTask ;
288+
289+ AsyncEventingBasicConsumer cons = ( AsyncEventingBasicConsumer ) o ;
290+ await cons . Channel . BasicAckAsync ( a . DeliveryTag , false ) ;
286291 } ;
287292
288- await consumeChannel . BasicConsumeAsync ( queueName , true , ConsumerTag . Empty , false , false , null , consumer ) ;
293+ await consumeChannel . BasicQosAsync ( 0 , 1 , false ) ;
294+ await consumeChannel . BasicConsumeAsync ( queueName , autoAck : false , ConsumerTag . Empty , false , false , null , consumer ) ;
289295 await consumerSyncSource . Task ;
290296
291297 await consumeChannel . CloseAsync ( ) ;
0 commit comments