99
1010final class BatchConsumer extends BaseAmqp implements DequeuerInterface
1111{
12- /**
13- * @var int
14- */
15- private $ consumed = 0 ;
16-
1712 /**
1813 * @var \Closure|callable
1914 */
@@ -75,43 +70,32 @@ public function consume()
7570 {
7671 $ this ->setupConsumer ();
7772
78- $ isConsuming = false ;
79- $ timeoutWanted = $ this ->getTimeoutWait ();
8073 while (count ($ this ->getChannel ()->callbacks )) {
81- $ this ->maybeStopConsumer ();
82- if (!$ this ->forceStop ) {
83- try {
84- $ this ->getChannel ()->wait (null , false , $ timeoutWanted );
85- $ isConsuming = true ;
86- } catch (AMQPTimeoutException $ e ) {
87- $ this ->batchConsume ();
88- if ($ isConsuming ) {
89- $ isConsuming = false ;
90- } elseif (null !== $ this ->getIdleTimeoutExitCode ()) {
91- return $ this ->getIdleTimeoutExitCode ();
92- } else {
93- throw $ e ;
94- }
95- }
96- } else {
74+ if ($ this ->isCompleteBatch ()) {
9775 $ this ->batchConsume ();
9876 }
9977
100- if ( $ this ->isCompleteBatch ( $ isConsuming )) {
101- $ this -> batchConsume ();
102- }
78+ $ this ->maybeStopConsumer ();
79+
80+ $ timeout = $ this -> isEmptyBatch () ? $ this -> getIdleTimeout () : $ this -> getTimeoutWait ();
10381
104- $ timeoutWanted = $ isConsuming ? $ this ->getTimeoutWait () : $ this ->getIdleTimeout ();
82+ try {
83+ $ this ->getChannel ()->wait (null , false , $ timeout );
84+ } catch (AMQPTimeoutException $ e ) {
85+ if (!$ this ->isEmptyBatch ()) {
86+ $ this ->batchConsume ();
87+ } elseif (null !== $ this ->getIdleTimeoutExitCode ()) {
88+ return $ this ->getIdleTimeoutExitCode ();
89+ } else {
90+ throw $ e ;
91+ }
92+ }
10593 }
10694 }
10795
108- public function batchConsume ()
96+ private function batchConsume ()
10997 {
110- if ($ this ->batchCounter === 0 ) {
111- return ;
112- }
113-
114- try {
98+ try {
11599 $ processFlags = call_user_func ($ this ->callback , $ this ->messages );
116100 $ this ->handleProcessMessages ($ processFlags );
117101 $ this ->logger ->debug ('Queue message processed ' , array (
@@ -129,6 +113,7 @@ public function batchConsume()
129113 'stacktrace ' => $ e ->getTraceAsString ()
130114 )
131115 ));
116+ $ this ->resetBatch ();
132117 $ this ->stopConsuming ();
133118 } catch (\Exception $ e ) {
134119 $ this ->logger ->error ($ e ->getMessage (), array (
@@ -166,9 +151,6 @@ protected function handleProcessMessages($processFlags = null)
166151 foreach ($ processFlags as $ deliveryTag => $ processFlag ) {
167152 $ this ->handleProcessFlag ($ deliveryTag , $ processFlag );
168153 }
169-
170- $ this ->consumed ++;
171- $ this ->maybeStopConsumer ();
172154 }
173155
174156 /**
@@ -177,7 +159,7 @@ protected function handleProcessMessages($processFlags = null)
177159 *
178160 * @return void
179161 */
180- private function handleProcessFlag ($ deliveryTag , $ processFlag )
162+ private function handleProcessFlag ($ deliveryTag , $ processFlag )
181163 {
182164 if ($ processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $ processFlag ) {
183165 // Reject and requeue message to RabbitMQ
@@ -195,13 +177,19 @@ private function handleProcessFlag ($deliveryTag, $processFlag)
195177 }
196178
197179 /**
198- * @param bool $isConsuming
199- *
200180 * @return bool
201181 */
202- protected function isCompleteBatch ($ isConsuming )
182+ protected function isCompleteBatch ()
203183 {
204- return $ isConsuming && $ this ->batchCounter === $ this ->prefetchCount ;
184+ return $ this ->batchCounter === $ this ->prefetchCount ;
185+ }
186+
187+ /**
188+ * @return bool
189+ */
190+ protected function isEmptyBatch ()
191+ {
192+ return $ this ->batchCounter === 0 ;
205193 }
206194
207195 /**
@@ -300,7 +288,9 @@ private function getMessageChannel($deliveryTag)
300288 */
301289 public function stopConsuming ()
302290 {
303- $ this ->batchConsume ();
291+ if (!$ this ->isEmptyBatch ()) {
292+ $ this ->batchConsume ();
293+ }
304294
305295 $ this ->getChannel ()->basic_cancel ($ this ->getConsumerTag ());
306296 }
0 commit comments