@@ -71,22 +71,6 @@ public function setCallback($callback)
7171 return $ this ;
7272 }
7373
74- public function start ()
75- {
76- $ this ->setupConsumer ();
77-
78- while (count ($ this ->getChannel ()->callbacks )) {
79- $ this ->getChannel ()->wait ();
80- }
81- }
82-
83- public function execute (AMQPMessage $ msg )
84- {
85- $ this ->addMessage ($ msg );
86-
87- $ this ->maybeStopConsumer ();
88- }
89-
9074 public function consume ()
9175 {
9276 $ this ->setupConsumer ();
@@ -230,40 +214,9 @@ protected function isCompleteBatch($isConsuming)
230214 */
231215 public function processMessage (AMQPMessage $ msg )
232216 {
233- try {
234- call_user_func (array ($ this , 'execute ' ), $ msg );
235- } catch (Exception \StopConsumerException $ e ) {
236- $ this ->logger ->info ('Consumer requested restart ' , array (
237- 'amqp ' => array (
238- 'queue ' => $ this ->queueOptions ['name ' ],
239- 'message ' => $ msg ,
240- 'stacktrace ' => $ e ->getTraceAsString ()
241- )
242- ));
243- $ this ->stopConsuming ();
244- } catch (\Exception $ e ) {
245- $ this ->logger ->error ($ e ->getMessage (), array (
246- 'amqp ' => array (
247- 'queue ' => $ this ->queueOptions ['name ' ],
248- 'message ' => $ msg ,
249- 'stacktrace ' => $ e ->getTraceAsString ()
250- )
251- ));
252- $ this ->batchConsume ();
253-
254- throw $ e ;
255- } catch (\Error $ e ) {
256- $ this ->logger ->error ($ e ->getMessage (), array (
257- 'amqp ' => array (
258- 'queue ' => $ this ->queueOptions ['name ' ],
259- 'message ' => $ msg ,
260- 'stacktrace ' => $ e ->getTraceAsString ()
261- )
262- ));
263- $ this ->batchConsume ();
217+ $ this ->addMessage ($ msg );
264218
265- throw $ e ;
266- }
219+ $ this ->maybeStopConsumer ();
267220 }
268221
269222 /**
0 commit comments