@@ -143,7 +143,7 @@ public function consumeProvider()
143143 */
144144 public function testConsume (array $ data )
145145 {
146- $ consumerCallBacks = $ data ['messages ' ];
146+ $ messageCount = count ( $ data ['messages ' ]) ;
147147
148148 // set up amqp connection
149149 $ amqpConnection = $ this ->prepareAMQPConnection ();
@@ -153,38 +153,38 @@ public function testConsume(array $data)
153153 ->method ('getChannelId ' )
154154 ->with ()
155155 ->willReturn (true );
156- $ amqpChannel ->expects ($ this ->once ())
156+ $ amqpChannel
157+ ->expects ($ this ->once ())
157158 ->method ('basic_consume ' )
158159 ->withAnyParameters ()
159160 ->willReturn (true );
161+ $ amqpChannel
162+ ->expects (self ::exactly (2 ))
163+ ->method ('is_consuming ' )
164+ ->willReturnOnConsecutiveCalls (array_merge (array_fill (0 , $ messageCount , true ), [false ]));
160165
161166 // set up consumer
162167 $ consumer = $ this ->getConsumer ($ amqpConnection , $ amqpChannel );
163168 // disable autosetup fabric so we do not mock more objects
164169 $ consumer ->disableAutoSetupFabric ();
165170 $ consumer ->setChannel ($ amqpChannel );
166- $ amqpChannel ->callbacks = $ consumerCallBacks ;
167171
168172 /**
169173 * Mock wait method and use a callback to remove one element each time from callbacks
170174 * This will simulate a basic consumer consume with provided messages count
171175 */
172- $ amqpChannel ->expects ($ this ->exactly (count ($ consumerCallBacks )))
176+ $ amqpChannel
177+ ->expects (self ::exactly (1 ))
173178 ->method ('wait ' )
174179 ->with (null , false , $ consumer ->getIdleTimeout ())
175- ->will (
176- $ this ->returnCallback (
177- function () use ($ amqpChannel ) {
178- /** remove an element on each loop like ... simulate an ACK */
179- array_splice ($ amqpChannel ->callbacks , 0 , 1 );
180- })
181- );
180+ ->willReturn (true );
182181
183182 $ eventDispatcher = $ this ->getMockBuilder (EventDispatcherInterface::class)
184183 ->disableOriginalConstructor ()
185184 ->getMock ();
186185
187- $ eventDispatcher ->expects ($ this ->exactly (count ($ consumerCallBacks )))
186+ $ eventDispatcher
187+ ->expects (self ::exactly (1 ))
188188 ->method ('dispatch ' )
189189 ->with ($ this ->isInstanceOf (OnConsumeEvent::class), OnConsumeEvent::NAME )
190190 ->willReturn ($ this ->isInstanceOf (OnConsumeEvent::class));
@@ -207,6 +207,10 @@ public function testIdleTimeoutExitCode()
207207 ->method ('basic_consume ' )
208208 ->withAnyParameters ()
209209 ->willReturn (true );
210+ $ amqpChannel
211+ ->expects ($ this ->any ())
212+ ->method ('is_consuming ' )
213+ ->willReturn (true );
210214
211215 // set up consumer
212216 $ consumer = $ this ->getConsumer ($ amqpConnection , $ amqpChannel );
@@ -215,7 +219,6 @@ public function testIdleTimeoutExitCode()
215219 $ consumer ->setChannel ($ amqpChannel );
216220 $ consumer ->setIdleTimeout (60 );
217221 $ consumer ->setIdleTimeoutExitCode (2 );
218- $ amqpChannel ->callbacks = array ('idle_timeout_exit_code ' );
219222
220223 $ amqpChannel ->expects ($ this ->exactly (1 ))
221224 ->method ('wait ' )
@@ -243,14 +246,17 @@ public function testShouldAllowContinueConsumptionAfterIdleTimeout()
243246 ->method ('basic_consume ' )
244247 ->withAnyParameters ()
245248 ->willReturn (true );
249+ $ amqpChannel
250+ ->expects ($ this ->any ())
251+ ->method ('is_consuming ' )
252+ ->willReturn (true );
246253
247254 // set up consumer
248255 $ consumer = $ this ->getConsumer ($ amqpConnection , $ amqpChannel );
249256 // disable autosetup fabric so we do not mock more objects
250257 $ consumer ->disableAutoSetupFabric ();
251258 $ consumer ->setChannel ($ amqpChannel );
252259 $ consumer ->setIdleTimeout (2 );
253- $ amqpChannel ->callbacks = array ('idle_timeout_exit_code ' );
254260
255261 $ amqpChannel ->expects ($ this ->exactly (2 ))
256262 ->method ('wait ' )
@@ -303,6 +309,10 @@ public function testGracefulMaxExecutionTimeoutExitCode()
303309 ->method ('basic_consume ' )
304310 ->withAnyParameters ()
305311 ->willReturn (true );
312+ $ amqpChannel
313+ ->expects ($ this ->any ())
314+ ->method ('is_consuming ' )
315+ ->willReturn (true );
306316
307317 // set up consumer
308318 $ consumer = $ this ->getConsumer ($ amqpConnection , $ amqpChannel );
@@ -312,7 +322,6 @@ public function testGracefulMaxExecutionTimeoutExitCode()
312322
313323 $ consumer ->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture (60 );
314324 $ consumer ->setGracefulMaxExecutionTimeoutExitCode (10 );
315- $ amqpChannel ->callbacks = array ('graceful_max_execution_timeout_test ' );
316325
317326 $ amqpChannel ->expects ($ this ->exactly (1 ))
318327 ->method ('wait ' )
@@ -339,6 +348,10 @@ public function testGracefulMaxExecutionWontWaitIfPastTheTimeout()
339348 ->method ('basic_consume ' )
340349 ->withAnyParameters ()
341350 ->willReturn (true );
351+ $ amqpChannel
352+ ->expects ($ this ->any ())
353+ ->method ('is_consuming ' )
354+ ->willReturn (true );
342355
343356 // set up consumer
344357 $ consumer = $ this ->getConsumer ($ amqpConnection , $ amqpChannel );
@@ -347,9 +360,9 @@ public function testGracefulMaxExecutionWontWaitIfPastTheTimeout()
347360 $ consumer ->setChannel ($ amqpChannel );
348361
349362 $ consumer ->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture (0 );
350- $ amqpChannel ->callbacks = array ('graceful_max_execution_timeout_test ' );
351363
352- $ amqpChannel ->expects ($ this ->never ())
364+ $ amqpChannel
365+ ->expects ($ this ->never ())
353366 ->method ('wait ' );
354367
355368 $ consumer ->consume (1 );
@@ -369,6 +382,10 @@ public function testTimeoutWait()
369382 ->method ('basic_consume ' )
370383 ->withAnyParameters ()
371384 ->willReturn (true );
385+ $ amqpChannel
386+ ->expects ($ this ->any ())
387+ ->method ('is_consuming ' )
388+ ->willReturn (true );
372389
373390 // set up consumer
374391 $ consumer = $ this ->getConsumer ($ amqpConnection , $ amqpChannel );
@@ -379,8 +396,6 @@ public function testTimeoutWait()
379396 $ consumer ->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture (60 );
380397 $ consumer ->setIdleTimeout (50 );
381398
382- $ amqpChannel ->callbacks = array ('timeout_wait_test ' );
383-
384399 $ amqpChannel ->expects ($ this ->exactly (2 ))
385400 ->method ('wait ' )
386401 ->with (null , false , $ this ->LessThanOrEqual ($ consumer ->getTimeoutWait ()) )
@@ -411,6 +426,10 @@ public function testTimeoutWaitWontWaitPastGracefulMaxExecutionTimeout()
411426 ->method ('basic_consume ' )
412427 ->withAnyParameters ()
413428 ->willReturn (true );
429+ $ amqpChannel
430+ ->expects ($ this ->any ())
431+ ->method ('is_consuming ' )
432+ ->willReturn (true );
414433
415434 // set up consumer
416435 $ consumer = $ this ->getConsumer ($ amqpConnection , $ amqpChannel );
@@ -420,7 +439,6 @@ public function testTimeoutWaitWontWaitPastGracefulMaxExecutionTimeout()
420439 $ consumer ->setTimeoutWait (20 );
421440
422441 $ consumer ->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture (10 );
423- $ amqpChannel ->callbacks = array ('graceful_max_execution_timeout_test ' );
424442
425443 $ amqpChannel ->expects ($ this ->once ())
426444 ->method ('wait ' )
@@ -448,6 +466,10 @@ public function testTimeoutWaitWontWaitPastIdleTimeout()
448466 ->method ('basic_consume ' )
449467 ->withAnyParameters ()
450468 ->willReturn (true );
469+ $ amqpChannel
470+ ->expects ($ this ->any ())
471+ ->method ('is_consuming ' )
472+ ->willReturn (true );
451473
452474 // set up consumer
453475 $ consumer = $ this ->getConsumer ($ amqpConnection , $ amqpChannel );
@@ -458,8 +480,6 @@ public function testTimeoutWaitWontWaitPastIdleTimeout()
458480 $ consumer ->setIdleTimeout (10 );
459481 $ consumer ->setIdleTimeoutExitCode (2 );
460482
461- $ amqpChannel ->callbacks = array ('idle_timeout_test ' );
462-
463483 $ amqpChannel ->expects ($ this ->once ())
464484 ->method ('wait ' )
465485 ->with (null , false , 10 )
0 commit comments