Skip to content

Commit 35c0b76

Browse files
Dnyaneshwar S JambhulkarDnyaneshwar S Jambhulkar
authored andcommitted
AC-14558::Migration form RabbitMQ to Apache ActiveMQ
1 parent 041dd92 commit 35c0b76

File tree

8 files changed

+41
-20
lines changed

8 files changed

+41
-20
lines changed

app/code/Magento/AsynchronousOperations/Model/MassConsumer.php

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,12 @@ public function process($maxNumberOfMessages = null)
8585
if (!isset($maxNumberOfMessages)) {
8686
$queue->subscribe($this->getTransactionCallback($queue));
8787
} else {
88-
$connectionName = $this->consumerConfig
89-
->getConsumer($this->configuration->getConsumerName())
90-
->getConnection();
9188
$this->invoker->invoke(
9289
$queue,
9390
$maxNumberOfMessages,
9491
$this->getTransactionCallback($queue),
9592
$maxIdleTime,
96-
$sleep,
97-
$connectionName
93+
$sleep
9894
);
9995
}
10096

app/code/Magento/Stomp/Model/Rpc/Publisher.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public function publish($topicName, $data)
111111
$publisher = $this->publisherConfig->getPublisher($topicName);
112112
$connectionName = $publisher->getConnection()->getName();
113113
$queue = $this->queueRepository->get($connectionName, $replyTo);
114-
$responseMessage = $queue->push($envelope);
114+
$responseMessage = $queue->callRpc($envelope);
115115
return $this->messageEncoder->decode($topicName, $responseMessage, false);
116116
}
117117
}

lib/internal/Magento/Framework/MessageQueue/CallbackInvoker.php

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
namespace Magento\Framework\MessageQueue;
88

9+
use Magento\Framework\App\ObjectManager;
910
use Magento\Framework\MessageQueue\PoisonPill\PoisonPillCompareInterface;
1011
use Magento\Framework\MessageQueue\PoisonPill\PoisonPillReadInterface;
1112
use Magento\Framework\App\DeploymentConfig;
@@ -35,19 +36,28 @@ class CallbackInvoker implements CallbackInvokerInterface
3536
*/
3637
private $deploymentConfig;
3738

39+
/**
40+
* @var DefaultValueProvider
41+
*/
42+
private $defaultValueProvider;
43+
3844
/**
3945
* @param PoisonPillReadInterface $poisonPillRead
4046
* @param PoisonPillCompareInterface $poisonPillCompare
4147
* @param DeploymentConfig $deploymentConfig
48+
* @param DefaultValueProvider|null $defaultValueProvider
4249
*/
4350
public function __construct(
4451
PoisonPillReadInterface $poisonPillRead,
4552
PoisonPillCompareInterface $poisonPillCompare,
46-
DeploymentConfig $deploymentConfig
53+
DeploymentConfig $deploymentConfig,
54+
DefaultValueProvider $defaultValueProvider
4755
) {
4856
$this->poisonPillRead = $poisonPillRead;
4957
$this->poisonPillCompare = $poisonPillCompare;
5058
$this->deploymentConfig = $deploymentConfig;
59+
$this->defaultValueProvider = $defaultValueProvider
60+
?? ObjectManager::getInstance()->get(DefaultValueProvider::class);
5161
}
5262

5363
/**
@@ -69,12 +79,12 @@ public function invoke(
6979
$maxNumberOfMessages,
7080
$callback,
7181
$maxIdleTime = null,
72-
$sleep = null,
73-
$connectionName = 'amqp'
82+
$sleep = null
7483
) {
7584
$this->poisonPillVersion = $this->poisonPillRead->getLatestVersion();
7685
$sleep = (int) $sleep ?: 1;
7786
$maxIdleTime = $maxIdleTime ? (int) $maxIdleTime : PHP_INT_MAX;
87+
$connectionName = $this->defaultValueProvider->getConnection();
7888
if ($connectionName === 'stomp') {
7989
$queue->subscribeQueue();
8090
}

lib/internal/Magento/Framework/MessageQueue/CallbackInvokerInterface.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ public function invoke(
2929
$maxNumberOfMessages,
3030
$callback,
3131
$maxIdleTime = null,
32-
$sleep = null,
33-
$connectionName = 'amqp'
32+
$sleep = null
3433
);
3534
}

lib/internal/Magento/Framework/MessageQueue/Consumer.php

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,12 @@ public function process($maxNumberOfMessages = null)
134134
if (!isset($maxNumberOfMessages)) {
135135
$queue->subscribe($this->getTransactionCallback($queue));
136136
} else {
137-
$connectionName = $this->consumerConfig
138-
->getConsumer($this->configuration->getConsumerName())
139-
->getConnection();
140137
$this->invoker->invoke(
141138
$queue,
142139
$maxNumberOfMessages,
143140
$this->getTransactionCallback($queue),
144141
$maxIdleTime,
145-
$sleep,
146-
$connectionName
142+
$sleep
147143
);
148144
}
149145
}
@@ -214,7 +210,7 @@ private function sendResponse(EnvelopeInterface $envelope)
214210
$messageProperties = $envelope->getProperties();
215211
$connectionName = $this->consumerConfig->getConsumer($this->configuration->getConsumerName())->getConnection();
216212
$queue = $this->queueRepository->get($connectionName, $messageProperties['reply_to']);
217-
$queue->push($envelope);
213+
$queue->callRpc($envelope);
218214
}
219215

220216
/**

lib/internal/Magento/Framework/MessageQueue/QueueInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionM
5555
* Push message to queue directly, without using exchange
5656
*
5757
* @param EnvelopeInterface $envelope
58-
* @return mixed
58+
* @return void
5959
* @since 103.0.0
6060
*/
6161
public function push(EnvelopeInterface $envelope);

lib/internal/Magento/Framework/Stomp/Bulk/Queue.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public function push(BaseQueueInterface $queue, string $topic, array $envelopes)
8787
if ($isSync) {
8888
$responses = [];
8989
foreach ($envelopes as $envelope) {
90-
$responses[] = $queue->push($envelope);
90+
$responses[] = $queue->callRpc($envelope);
9191
}
9292
return $responses;
9393
}

lib/internal/Magento/Framework/Stomp/Queue.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,26 @@ public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionM
205205
* @throws ConnectionLostException
206206
*/
207207
public function push(EnvelopeInterface $envelope)
208+
{
209+
$stompClient = $this->getStompProducerClient();
210+
$message = new Message($envelope->getBody(), $envelope->getProperties());
211+
try{
212+
$stompClient->send($this->queueName, $message);
213+
$stompClient->disconnect();
214+
}catch (\Stomp\Exception\StompException $e){
215+
$this->logger->info("Stomp message push failed: '{$this->queueName}' error: {$e->getMessage()}");
216+
}
217+
}
218+
219+
/**
220+
* Push message to queue and read message from queue
221+
*
222+
* @param EnvelopeInterface $envelope
223+
* @return string|null
224+
* @throws ConnectionLostException
225+
* @throws LocalizedException
226+
*/
227+
public function callRpc(EnvelopeInterface $envelope)
208228
{
209229
$stompClient = $this->getStompProducerClient();
210230
$properties = $envelope->getProperties();
@@ -224,7 +244,7 @@ public function push(EnvelopeInterface $envelope)
224244
return $message->getBody();
225245
}
226246
}catch (\Stomp\Exception\StompException $e){
227-
return null;
247+
$this->logger->info("Stomp rpc message push failed: '{$this->queueName}' error: '{$e->getMessage()}'");
228248
}
229249

230250
return null;

0 commit comments

Comments
 (0)