33 * Copyright © Magento, Inc. All rights reserved.
44 * See COPYING.txt for license details.
55 */
6+
67namespace Magento \Framework \MessageQueue ;
78
9+ use Exception ;
10+ use Magento \Framework \App \ObjectManager ;
811use Magento \Framework \App \ResourceConnection ;
12+ use Magento \Framework \Communication \ConfigInterface as CommunicationConfig ;
913use Magento \Framework \Exception \LocalizedException ;
10- use Magento \Framework \Phrase ;
14+ use Magento \Framework \Exception \ NotFoundException ;
1115use Magento \Framework \MessageQueue \Consumer \ConfigInterface as ConsumerConfig ;
12- use Magento \Framework \Communication \ConfigInterface as CommunicationConfig ;
13- use Magento \Framework \MessageQueue \QueueRepository ;
16+ use Magento \Framework \Phrase ;
1417use Psr \Log \LoggerInterface ;
1518
1619/**
@@ -84,22 +87,40 @@ class Consumer implements ConsumerInterface
8487 * @param MessageEncoder $messageEncoder
8588 * @param ResourceConnection $resource
8689 * @param ConsumerConfigurationInterface $configuration
87- * @param LoggerInterface $logger
88- *
89- * @SuppressWarnings(PHPMD.UnusedFormalParameter)
90+ * @param LoggerInterface|null $logger
91+ * @param ConsumerConfig|null $consumerConfig
92+ * @param CommunicationConfig|null $communicationConfig
93+ * @param QueueRepository|null $queueRepository
94+ * @param MessageController|null $messageController
95+ * @param MessageValidator|null $messageValidator
96+ * @param EnvelopeFactory|null $envelopeFactory
97+ * @SuppressWarnings(PHPMD.ExcessiveParameterList)
9098 */
9199 public function __construct (
92100 CallbackInvokerInterface $ invoker ,
93101 MessageEncoder $ messageEncoder ,
94102 ResourceConnection $ resource ,
95103 ConsumerConfigurationInterface $ configuration ,
96- LoggerInterface $ logger = null
104+ LoggerInterface $ logger = null ,
105+ ConsumerConfig $ consumerConfig = null ,
106+ CommunicationConfig $ communicationConfig = null ,
107+ QueueRepository $ queueRepository = null ,
108+ MessageController $ messageController = null ,
109+ MessageValidator $ messageValidator = null ,
110+ EnvelopeFactory $ envelopeFactory = null
97111 ) {
98112 $ this ->invoker = $ invoker ;
99113 $ this ->messageEncoder = $ messageEncoder ;
100114 $ this ->resource = $ resource ;
101115 $ this ->configuration = $ configuration ;
102- $ this ->logger = $ logger ?: \Magento \Framework \App \ObjectManager::getInstance ()->get (LoggerInterface::class);
116+ $ this ->logger = $ logger ?: ObjectManager::getInstance ()->get (LoggerInterface::class);
117+ $ this ->consumerConfig = $ consumerConfig ?: ObjectManager::getInstance ()->get (ConsumerConfig::class);
118+ $ this ->communicationConfig = $ communicationConfig
119+ ?: ObjectManager::getInstance ()->get (CommunicationConfig::class);
120+ $ this ->queueRepository = $ queueRepository ?: ObjectManager::getInstance ()->get (QueueRepository::class);
121+ $ this ->messageController = $ messageController ?: ObjectManager::getInstance ()->get (MessageController::class);
122+ $ this ->messageValidator = $ messageValidator ?: ObjectManager::getInstance ()->get (MessageValidator::class);
123+ $ this ->envelopeFactory = $ envelopeFactory ?: ObjectManager::getInstance ()->get (EnvelopeFactory::class);
103124 }
104125
105126 /**
@@ -142,7 +163,8 @@ private function dispatchMessage(EnvelopeInterface $message, $isSync = false)
142163 $ messageSchemaType = $ this ->configuration ->getMessageSchemaType ($ topicName );
143164 if ($ messageSchemaType == CommunicationConfig::TOPIC_REQUEST_TYPE_METHOD ) {
144165 foreach ($ handlers as $ callback ) {
145- $ result = call_user_func_array ($ callback , $ decodedMessage );
166+ // The `array_values` is a workaround to ensure the same behavior in PHP 7 and 8.
167+ $ result = call_user_func_array ($ callback , array_values ($ decodedMessage ));
146168 return $ this ->processSyncResponse ($ topicName , $ result );
147169 }
148170 } else {
@@ -168,7 +190,7 @@ private function dispatchMessage(EnvelopeInterface $message, $isSync = false)
168190 private function processSyncResponse ($ topicName , $ result )
169191 {
170192 if (isset ($ result )) {
171- $ this ->getMessageValidator () ->validate ($ topicName , $ result , false );
193+ $ this ->messageValidator ->validate ($ topicName , $ result , false );
172194 return $ this ->messageEncoder ->encode ($ topicName , $ result , false );
173195 } else {
174196 throw new LocalizedException (new Phrase ('No reply message resulted in RPC. ' ));
@@ -179,14 +201,15 @@ private function processSyncResponse($topicName, $result)
179201 * Send RPC response message.
180202 *
181203 * @param EnvelopeInterface $envelope
204+ *
182205 * @return void
206+ * @throws LocalizedException
183207 */
184208 private function sendResponse (EnvelopeInterface $ envelope )
185209 {
186210 $ messageProperties = $ envelope ->getProperties ();
187- $ connectionName = $ this ->getConsumerConfig ()
188- ->getConsumer ($ this ->configuration ->getConsumerName ())->getConnection ();
189- $ queue = $ this ->getQueueRepository ()->get ($ connectionName , $ messageProperties ['reply_to ' ]);
211+ $ connectionName = $ this ->consumerConfig ->getConsumer ($ this ->configuration ->getConsumerName ())->getConnection ();
212+ $ queue = $ this ->queueRepository ->get ($ connectionName , $ messageProperties ['reply_to ' ]);
190213 $ queue ->push ($ envelope );
191214 }
192215
@@ -203,12 +226,12 @@ private function getTransactionCallback(QueueInterface $queue)
203226 $ lock = null ;
204227 try {
205228 $ topicName = $ message ->getProperties ()['topic_name ' ];
206- $ topicConfig = $ this ->getCommunicationConfig () ->getTopic ($ topicName );
207- $ lock = $ this ->getMessageController () ->lock ($ message , $ this ->configuration ->getConsumerName ());
229+ $ topicConfig = $ this ->communicationConfig ->getTopic ($ topicName );
230+ $ lock = $ this ->messageController ->lock ($ message , $ this ->configuration ->getConsumerName ());
208231
209232 if ($ topicConfig [CommunicationConfig::TOPIC_IS_SYNCHRONOUS ]) {
210233 $ responseBody = $ this ->dispatchMessage ($ message , true );
211- $ responseMessage = $ this ->getEnvelopeFactory () ->create (
234+ $ responseMessage = $ this ->envelopeFactory ->create (
212235 ['body ' => $ responseBody , 'properties ' => $ message ->getProperties ()]
213236 );
214237 $ this ->sendResponse ($ responseMessage );
@@ -224,115 +247,25 @@ private function getTransactionCallback(QueueInterface $queue)
224247 $ queue ->acknowledge ($ message );
225248 } catch (MessageLockException $ exception ) {
226249 $ queue ->acknowledge ($ message );
227- } catch (\ Magento \ Framework \ MessageQueue \ ConnectionLostException $ e ) {
250+ } catch (ConnectionLostException $ exception ) {
228251 if ($ lock ) {
229- $ this ->resource ->getConnection ()
230- ->delete ($ this ->resource ->getTableName ('queue_lock ' ), ['id = ? ' => $ lock ->getId ()]);
252+ $ this ->resource ->getConnection ()->delete (
253+ $ this ->resource ->getTableName ('queue_lock ' ),
254+ ['id = ? ' => $ lock ->getId ()]
255+ );
231256 }
232- } catch (\ Magento \ Framework \ Exception \ NotFoundException $ e ) {
257+ } catch (NotFoundException $ exception ) {
233258 $ queue ->acknowledge ($ message );
234- $ this ->logger ->warning ($ e ->getMessage ());
235- } catch (\ Exception $ e ) {
236- $ queue ->reject ($ message , false , $ e ->getMessage ());
259+ $ this ->logger ->warning ($ exception ->getMessage ());
260+ } catch (Exception $ exception ) {
261+ $ queue ->reject ($ message , false , $ exception ->getMessage ());
237262 if ($ lock ) {
238- $ this ->resource ->getConnection ()
239- ->delete ($ this ->resource ->getTableName ('queue_lock ' ), ['id = ? ' => $ lock ->getId ()]);
263+ $ this ->resource ->getConnection ()->delete (
264+ $ this ->resource ->getTableName ('queue_lock ' ),
265+ ['id = ? ' => $ lock ->getId ()]
266+ );
240267 }
241268 }
242269 };
243270 }
244-
245- /**
246- * Get consumer config.
247- *
248- * @return ConsumerConfig
249- *
250- * @deprecated 103.0.0
251- */
252- private function getConsumerConfig ()
253- {
254- if ($ this ->consumerConfig === null ) {
255- $ this ->consumerConfig = \Magento \Framework \App \ObjectManager::getInstance ()->get (ConsumerConfig::class);
256- }
257- return $ this ->consumerConfig ;
258- }
259-
260- /**
261- * Get communication config.
262- *
263- * @return CommunicationConfig
264- *
265- * @deprecated 103.0.0
266- */
267- private function getCommunicationConfig ()
268- {
269- if ($ this ->communicationConfig === null ) {
270- $ this ->communicationConfig = \Magento \Framework \App \ObjectManager::getInstance ()
271- ->get (CommunicationConfig::class);
272- }
273- return $ this ->communicationConfig ;
274- }
275-
276- /**
277- * Get queue repository.
278- *
279- * @return QueueRepository
280- *
281- * @deprecated 103.0.0
282- */
283- private function getQueueRepository ()
284- {
285- if ($ this ->queueRepository === null ) {
286- $ this ->queueRepository = \Magento \Framework \App \ObjectManager::getInstance ()->get (QueueRepository::class);
287- }
288- return $ this ->queueRepository ;
289- }
290-
291- /**
292- * Get message controller.
293- *
294- * @return MessageController
295- *
296- * @deprecated 103.0.0
297- */
298- private function getMessageController ()
299- {
300- if ($ this ->messageController === null ) {
301- $ this ->messageController = \Magento \Framework \App \ObjectManager::getInstance ()
302- ->get (MessageController::class);
303- }
304- return $ this ->messageController ;
305- }
306-
307- /**
308- * Get message validator.
309- *
310- * @return MessageValidator
311- *
312- * @deprecated 103.0.0
313- */
314- private function getMessageValidator ()
315- {
316- if ($ this ->messageValidator === null ) {
317- $ this ->messageValidator = \Magento \Framework \App \ObjectManager::getInstance ()
318- ->get (MessageValidator::class);
319- }
320- return $ this ->messageValidator ;
321- }
322-
323- /**
324- * Get envelope factory.
325- *
326- * @return EnvelopeFactory
327- *
328- * @deprecated 103.0.0
329- */
330- private function getEnvelopeFactory ()
331- {
332- if ($ this ->envelopeFactory === null ) {
333- $ this ->envelopeFactory = \Magento \Framework \App \ObjectManager::getInstance ()
334- ->get (EnvelopeFactory::class);
335- }
336- return $ this ->envelopeFactory ;
337- }
338271}
0 commit comments