44
55use OldSound \RabbitMqBundle \Event \OnConsumeEvent ;
66use PhpAmqpLib \Exception \AMQPTimeoutException ;
7+ use PhpAmqpLib \Message \AMQPMessage ;
78
89class BatchConsumer extends Consumer
910{
@@ -17,6 +18,21 @@ class BatchConsumer extends Consumer
1718 */
1819 protected $ timeoutWait ;
1920
21+ /**
22+ * @var array
23+ */
24+ protected $ messages = array ();
25+
26+ /**
27+ * @var int
28+ */
29+ protected $ batchCounter = 0 ;
30+
31+ /**
32+ * @var \Closure
33+ */
34+ protected $ batchCallback ;
35+
2036 /**
2137 * @inheritDoc
2238 */
@@ -36,6 +52,7 @@ public function consume($msgAmount)
3652 $ this ->consumeMessage ($ timeoutWanted );
3753 $ isConsuming = true ;
3854 } catch (AMQPTimeoutException $ e ) {
55+ $ this ->batchConsume ();
3956 if ($ isConsuming ) {
4057 $ isConsuming = false ;
4158 } elseif (null !== $ this ->getIdleTimeoutExitCode ()) {
@@ -46,10 +63,128 @@ public function consume($msgAmount)
4663 }
4764 }
4865
66+ if ($ this ->isCompleteBatch ($ isConsuming )) {
67+ $ this ->batchConsume ();
68+ }
69+
4970 $ timeoutWanted = ($ isConsuming ) ? $ this ->getTimeoutWait () : $ this ->getIdleTimeout ();
5071 }
5172 }
5273
74+ /**
75+ * @return void
76+ *
77+ * @throws \Exception
78+ */
79+ protected function batchConsume ()
80+ {
81+ if ($ this ->batchCounter == 0 ) {
82+ return ;
83+ }
84+
85+ try {
86+ call_user_func ($ this ->batchCallback );
87+ $ this ->resetBatch ();
88+ } catch (\Exception $ exception ) {
89+ $ this ->resetBatch (true );
90+ throw $ exception ;
91+ }
92+ }
93+
94+ /**
95+ * @param bool $isConsuming
96+ *
97+ * @return bool
98+ */
99+ protected function isCompleteBatch ($ isConsuming )
100+ {
101+ return $ isConsuming && $ this ->batchCounter != 0 && $ this ->batchCounter %$ this ->prefetchCount == 0 ;
102+ }
103+
104+ /**
105+ * @inheritDoc
106+ */
107+ public function stopConsuming ()
108+ {
109+ $ this ->batchConsume ();
110+
111+ parent ::stopConsuming ();
112+ }
113+
114+ /**
115+ * @inheritDoc
116+ */
117+ protected function handleProcessMessage (AMQPMessage $ msg , $ processFlag )
118+ {
119+ $ isRejectedOrReQueued = false ;
120+
121+ if ($ processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $ processFlag ) {
122+ // Reject and requeue message to RabbitMQ
123+ $ msg ->delivery_info ['channel ' ]->basic_reject ($ msg ->delivery_info ['delivery_tag ' ], true );
124+ $ isRejectedOrReQueued = true ;
125+ } else if ($ processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE ) {
126+ // NACK and requeue message to RabbitMQ
127+ $ msg ->delivery_info ['channel ' ]->basic_nack ($ msg ->delivery_info ['delivery_tag ' ], false , true );
128+ $ isRejectedOrReQueued = true ;
129+ } else if ($ processFlag === ConsumerInterface::MSG_REJECT ) {
130+ // Reject and drop
131+ $ msg ->delivery_info ['channel ' ]->basic_reject ($ msg ->delivery_info ['delivery_tag ' ], false );
132+ }
133+
134+ $ this ->consumed ++;
135+ $ this ->maybeStopConsumer ();
136+ if (!$ isRejectedOrReQueued ) {
137+ $ this ->addDeliveryTag ($ msg );
138+ }
139+
140+ if (!is_null ($ this ->getMemoryLimit ()) && $ this ->isRamAlmostOverloaded ()) {
141+ $ this ->stopConsuming ();
142+ }
143+ }
144+
145+ /**
146+ * @param bool $hasExceptions
147+ *
148+ * @return void
149+ */
150+ private function resetBatch ($ hasExceptions = false )
151+ {
152+ if ($ hasExceptions ) {
153+ array_map (function (AMQPMessage $ msg ) {
154+ $ msg ->delivery_info ['channel ' ]->basic_reject ($ msg ->delivery_info ['delivery_tag ' ], true );
155+ }, $ this ->messages );
156+ } else {
157+ array_map (function (AMQPMessage $ msg ) {
158+ $ msg ->delivery_info ['channel ' ]->basic_ack ($ msg ->delivery_info ['delivery_tag ' ]);
159+ }, $ this ->messages );
160+ }
161+
162+ $ this ->messages = array ();
163+ $ this ->batchCounter = 0 ;
164+ }
165+
166+ /**
167+ * @param AMQPMessage $message
168+ *
169+ * @return void
170+ */
171+ private function addDeliveryTag (AMQPMessage $ message )
172+ {
173+ $ this ->messages [$ this ->batchCounter ++] = $ message ;
174+ }
175+
176+ /**
177+ * @param \Closure $callback
178+ *
179+ * @return $this
180+ */
181+ public function setBatchCallback ($ callback )
182+ {
183+ $ this ->batchCallback = $ callback ;
184+
185+ return $ this ;
186+ }
187+
53188 /**
54189 * @param int $timeout
55190 *
0 commit comments