99
1010final class BatchConsumer extends BaseAmqp implements DequeuerInterface
1111{
12- /**
13- * @var int
14- */
15- private $ consumed = 0 ;
16-
1712 /**
1813 * @var \Closure|callable
1914 */
@@ -71,67 +66,36 @@ public function setCallback($callback)
7166 return $ this ;
7267 }
7368
74- public function start ()
69+ public function consume ()
7570 {
7671 $ this ->setupConsumer ();
7772
7873 while (count ($ this ->getChannel ()->callbacks )) {
79- $ this ->getChannel ()->wait ();
80- }
81- }
82-
83- public function execute (AMQPMessage $ msg )
84- {
85- $ this ->addMessage ($ msg );
86-
87- $ this ->maybeStopConsumer ();
74+ if ($ this ->isCompleteBatch ()) {
75+ $ this ->batchConsume ();
76+ }
8877
89- if (null !== $ this ->getMemoryLimit () && $ this ->isRamAlmostOverloaded ()) {
90- $ this ->stopConsuming ();
91- }
92- }
78+ $ this ->maybeStopConsumer ();
9379
94- public function consume ()
95- {
96- $ this ->setupConsumer ();
80+ $ timeout = $ this ->isEmptyBatch () ? $ this ->getIdleTimeout () : $ this ->getTimeoutWait ();
9781
98- $ isConsuming = false ;
99- $ timeoutWanted = $ this ->getTimeoutWait ();
100- while (count ($ this ->getChannel ()->callbacks )) {
101- $ this ->maybeStopConsumer ();
102- if (!$ this ->forceStop ) {
103- try {
104- $ this ->getChannel ()->wait (null , false , $ timeoutWanted );
105- $ isConsuming = true ;
106- } catch (AMQPTimeoutException $ e ) {
82+ try {
83+ $ this ->getChannel ()->wait (null , false , $ timeout );
84+ } catch (AMQPTimeoutException $ e ) {
85+ if (!$ this ->isEmptyBatch ()) {
10786 $ this ->batchConsume ();
108- if ($ isConsuming ) {
109- $ isConsuming = false ;
110- } elseif (null !== $ this ->getIdleTimeoutExitCode ()) {
111- return $ this ->getIdleTimeoutExitCode ();
112- } else {
113- throw $ e ;
114- }
87+ } elseif (null !== $ this ->getIdleTimeoutExitCode ()) {
88+ return $ this ->getIdleTimeoutExitCode ();
89+ } else {
90+ throw $ e ;
11591 }
116- } else {
117- $ this ->batchConsume ();
11892 }
119-
120- if ($ this ->isCompleteBatch ($ isConsuming )) {
121- $ this ->batchConsume ();
122- }
123-
124- $ timeoutWanted = $ isConsuming ? $ this ->getTimeoutWait () : $ this ->getIdleTimeout ();
12593 }
12694 }
12795
128- public function batchConsume ()
96+ private function batchConsume ()
12997 {
130- if ($ this ->batchCounter === 0 ) {
131- return ;
132- }
133-
134- try {
98+ try {
13599 $ processFlags = call_user_func ($ this ->callback , $ this ->messages );
136100 $ this ->handleProcessMessages ($ processFlags );
137101 $ this ->logger ->debug ('Queue message processed ' , array (
@@ -149,6 +113,7 @@ public function batchConsume()
149113 'stacktrace ' => $ e ->getTraceAsString ()
150114 )
151115 ));
116+ $ this ->resetBatch ();
152117 $ this ->stopConsuming ();
153118 } catch (\Exception $ e ) {
154119 $ this ->logger ->error ($ e ->getMessage (), array (
@@ -186,13 +151,6 @@ protected function handleProcessMessages($processFlags = null)
186151 foreach ($ processFlags as $ deliveryTag => $ processFlag ) {
187152 $ this ->handleProcessFlag ($ deliveryTag , $ processFlag );
188153 }
189-
190- $ this ->consumed ++;
191- $ this ->maybeStopConsumer ();
192-
193- if (null !== $ this ->getMemoryLimit () && $ this ->isRamAlmostOverloaded ()) {
194- $ this ->stopConsuming ();
195- }
196154 }
197155
198156 /**
@@ -201,7 +159,7 @@ protected function handleProcessMessages($processFlags = null)
201159 *
202160 * @return void
203161 */
204- private function handleProcessFlag ($ deliveryTag , $ processFlag )
162+ private function handleProcessFlag ($ deliveryTag , $ processFlag )
205163 {
206164 if ($ processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $ processFlag ) {
207165 // Reject and requeue message to RabbitMQ
@@ -219,13 +177,19 @@ private function handleProcessFlag ($deliveryTag, $processFlag)
219177 }
220178
221179 /**
222- * @param bool $isConsuming
223- *
224180 * @return bool
225181 */
226- protected function isCompleteBatch ($ isConsuming )
182+ protected function isCompleteBatch ()
227183 {
228- return $ isConsuming && $ this ->batchCounter === $ this ->prefetchCount ;
184+ return $ this ->batchCounter === $ this ->prefetchCount ;
185+ }
186+
187+ /**
188+ * @return bool
189+ */
190+ protected function isEmptyBatch ()
191+ {
192+ return $ this ->batchCounter === 0 ;
229193 }
230194
231195 /**
@@ -238,40 +202,9 @@ protected function isCompleteBatch($isConsuming)
238202 */
239203 public function processMessage (AMQPMessage $ msg )
240204 {
241- try {
242- call_user_func (array ($ this , 'execute ' ), $ msg );
243- } catch (Exception \StopConsumerException $ e ) {
244- $ this ->logger ->info ('Consumer requested restart ' , array (
245- 'amqp ' => array (
246- 'queue ' => $ this ->queueOptions ['name ' ],
247- 'message ' => $ msg ,
248- 'stacktrace ' => $ e ->getTraceAsString ()
249- )
250- ));
251- $ this ->stopConsuming ();
252- } catch (\Exception $ e ) {
253- $ this ->logger ->error ($ e ->getMessage (), array (
254- 'amqp ' => array (
255- 'queue ' => $ this ->queueOptions ['name ' ],
256- 'message ' => $ msg ,
257- 'stacktrace ' => $ e ->getTraceAsString ()
258- )
259- ));
260- $ this ->batchConsume ();
261-
262- throw $ e ;
263- } catch (\Error $ e ) {
264- $ this ->logger ->error ($ e ->getMessage (), array (
265- 'amqp ' => array (
266- 'queue ' => $ this ->queueOptions ['name ' ],
267- 'message ' => $ msg ,
268- 'stacktrace ' => $ e ->getTraceAsString ()
269- )
270- ));
271- $ this ->batchConsume ();
205+ $ this ->addMessage ($ msg );
272206
273- throw $ e ;
274- }
207+ $ this ->maybeStopConsumer ();
275208 }
276209
277210 /**
@@ -355,7 +288,9 @@ private function getMessageChannel($deliveryTag)
355288 */
356289 public function stopConsuming ()
357290 {
358- $ this ->batchConsume ();
291+ if (!$ this ->isEmptyBatch ()) {
292+ $ this ->batchConsume ();
293+ }
359294
360295 $ this ->getChannel ()->basic_cancel ($ this ->getConsumerTag ());
361296 }
@@ -390,6 +325,10 @@ protected function maybeStopConsumer()
390325 if ($ this ->forceStop ) {
391326 $ this ->stopConsuming ();
392327 }
328+
329+ if (null !== $ this ->getMemoryLimit () && $ this ->isRamAlmostOverloaded ()) {
330+ $ this ->stopConsuming ();
331+ }
393332 }
394333
395334 /**
0 commit comments