Skip to content

Commit 3cb9b9b

Browse files
committed
AC-14558::Migration form RabbitMQ to Apache ActiveMQ
1 parent cad5933 commit 3cb9b9b

File tree

3 files changed

+310
-1
lines changed

3 files changed

+310
-1
lines changed

dev/tests/integration/testsuite/Magento/Framework/Console/CliStateTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?php
22
/**
33
* Copyright 2025 Adobe.
4-
* All rights reserved.
4+
* All Rights Reserved.
55
*/
66
declare(strict_types=1);
77

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
<?php
2+
/**
3+
* Copyright 2025 Adobe
4+
* All Rights Reserved.
5+
*/
6+
namespace Magento\Framework\MessageQueue\UseCase;
7+
8+
use Magento\Framework\MessageQueue\DefaultValueProvider;
9+
use Magento\TestFramework\Helper\Bootstrap;
10+
use Magento\TestModuleAsyncStomp\Model\AsyncTestData;
11+
12+
class WildcardTopicStompTest extends QueueTestCaseAbstract
13+
{
14+
/**
15+
* @var string[]
16+
*/
17+
protected $consumers = [
18+
'stomp.wildcard.queue.one.consumer',
19+
'stomp.wildcard.queue.two.consumer'
20+
];
21+
22+
/**
23+
* @var string
24+
*/
25+
private $connectionType;
26+
27+
/**
28+
* @inheritdoc
29+
*/
30+
protected function setUp(): void
31+
{
32+
$this->objectManager = Bootstrap::getObjectManager();
33+
/** @var DefaultValueProvider $defaultValueProvider */
34+
$defaultValueProvider = $this->objectManager->get(DefaultValueProvider::class);
35+
$this->connectionType = $defaultValueProvider->getConnection();
36+
37+
if ($this->connectionType === 'stomp') {
38+
parent::setUp();
39+
}
40+
}
41+
42+
/**
43+
* @param string $topic
44+
* @param string[] $matchingQueues
45+
* @param string[] $nonMatchingQueues
46+
*
47+
* @dataProvider wildCardTopicsDataProvider
48+
*/
49+
public function testWildCardMatchingTopic($topic, $matchingQueues, $nonMatchingQueues)
50+
{
51+
if ($this->connectionType === 'amqp') {
52+
$this->markTestSkipped('STOMP test skipped because AMQP connection is available.
53+
This test is STOMP-specific.');
54+
}
55+
56+
$testObject = $this->generateTestObject();
57+
$this->publisher->publish($topic, $testObject);
58+
59+
$this->waitForAsynchronousResult(count($matchingQueues), $this->logFilePath);
60+
61+
$this->assertFileExists($this->logFilePath, "No handlers invoked (log file was not created).");
62+
foreach ($nonMatchingQueues as $queueName) {
63+
$this->assertStringNotContainsString($queueName, file_get_contents($this->logFilePath));
64+
}
65+
foreach ($matchingQueues as $queueName) {
66+
$this->assertStringContainsString($queueName, file_get_contents($this->logFilePath));
67+
}
68+
}
69+
70+
public static function wildCardTopicsDataProvider()
71+
{
72+
return [
73+
'stomp.segment1.segment2.segment3.wildcard' => [
74+
'stomp.segment1.segment2.segment3.wildcard',
75+
['stomp.wildcard.queue.one'],
76+
['stomp.wildcard.queue.three']
77+
],
78+
'stomp.segment2.segment3.wildcard' => [
79+
'stomp.segment2.segment3.wildcard',
80+
['stomp.wildcard.queue.one'],
81+
['stomp.wildcard.queue.two']
82+
]
83+
];
84+
}
85+
86+
public function testWildCardNonMatchingTopic()
87+
{
88+
if ($this->connectionType === 'amqp') {
89+
$this->markTestSkipped('STOMP test skipped because AMQP connection is available.
90+
This test is STOMP-specific.');
91+
}
92+
93+
$testObject = $this->generateTestObject();
94+
$this->publisher->publish('no.match.at.all', $testObject);
95+
sleep(2);
96+
$this->assertFileDoesNotExist($this->logFilePath, "No log file must be created for non-matching topic.");
97+
}
98+
99+
/**
100+
* @return AsyncTestData
101+
*/
102+
private function generateTestObject()
103+
{
104+
$testObject = $this->objectManager->create(AsyncTestData::class); // @phpstan-ignore-line
105+
$testObject->setValue('||Message Contents||');
106+
$testObject->setTextFilePath($this->logFilePath);
107+
return $testObject;
108+
}
109+
}
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
<?php
2+
/**
3+
* Copyright 2025 Adobe
4+
* All Rights Reserved.
5+
*/
6+
namespace Magento\Framework\Stomp;
7+
8+
use Magento\Framework\App\DeploymentConfig\FileReader;
9+
use Magento\Framework\App\DeploymentConfig\Writer;
10+
use Magento\Framework\Config\File\ConfigFilePool;
11+
use Magento\Framework\Filesystem;
12+
use Magento\Framework\MessageQueue\DefaultValueProvider;
13+
use Magento\Framework\MessageQueue\UseCase\QueueTestCaseAbstract;
14+
use Magento\TestFramework\Helper\Bootstrap;
15+
use Magento\TestModuleAsyncStomp\Model\AsyncTestData;
16+
17+
class WaitAndNotWaitMessagesTest extends QueueTestCaseAbstract
18+
{
19+
/**
20+
* @var FileReader
21+
*/
22+
private $reader;
23+
24+
/**
25+
* @var Filesystem
26+
*/
27+
private $filesystem;
28+
29+
/**
30+
* @var array
31+
*/
32+
private $config;
33+
34+
/**
35+
* @var AsyncTestData
36+
*/
37+
protected $msgObject;
38+
39+
/**
40+
* @var array
41+
*/
42+
protected $consumers = ['stomp.mixed.sync.and.async.queue.consumer'];
43+
44+
/**
45+
* @var string[]
46+
*/
47+
protected $messages = ['message1', 'message2', 'message3'];
48+
49+
/**
50+
* @var int|null
51+
*/
52+
protected $maxMessages = 4;
53+
54+
/**
55+
* @var string
56+
*/
57+
private $connectionType;
58+
59+
/**
60+
* @inheritdoc
61+
*/
62+
protected function setUp(): void
63+
{
64+
$this->objectManager = Bootstrap::getObjectManager();
65+
/** @var DefaultValueProvider $defaultValueProvider */
66+
$defaultValueProvider = $this->objectManager->get(DefaultValueProvider::class);
67+
$this->connectionType = $defaultValueProvider->getConnection();
68+
69+
if ($this->connectionType === 'stomp') {
70+
parent::setUp();
71+
$this->reader = $this->objectManager->get(FileReader::class);
72+
$this->filesystem = $this->objectManager->get(Filesystem::class);
73+
$this->config = $this->loadConfig();
74+
}
75+
}
76+
77+
/**
78+
* Get message object, creating it lazily
79+
*
80+
* @return AsyncTestData
81+
*/
82+
private function getMsgObject(): AsyncTestData
83+
{
84+
if (!$this->msgObject) {
85+
// phpstan:ignore "Class Magento\TestModuleAsyncStomp\Model\AsyncTestData not found."
86+
$this->msgObject = $this->objectManager->create(AsyncTestData::class);
87+
}
88+
return $this->msgObject;
89+
}
90+
91+
/**
92+
* Check if consumers wait for messages from the queue
93+
*/
94+
public function testWaitForMessages()
95+
{
96+
if ($this->connectionType === 'amqp') {
97+
$this->markTestSkipped('STOMP test skipped because AMQP connection is available.
98+
This test is STOMP-specific.');
99+
}
100+
101+
$this->publisherConsumerController->stopConsumers();
102+
103+
$config = $this->config;
104+
$config['queue']['consumers_wait_for_messages'] = 1;
105+
$this->writeConfig($config);
106+
107+
$loadedConfig = $this->loadConfig();
108+
$this->assertArrayHasKey('queue', $loadedConfig);
109+
$this->assertArrayHasKey('consumers_wait_for_messages', $loadedConfig['queue']);
110+
$this->assertEquals(1, $loadedConfig['queue']['consumers_wait_for_messages']);
111+
112+
foreach ($this->messages as $message) {
113+
$this->publishMessage($message);
114+
}
115+
$this->publisherConsumerController->startConsumers();
116+
$this->waitForAsynchronousResult(count($this->messages), $this->logFilePath);
117+
118+
foreach ($this->messages as $item) {
119+
$this->assertStringContainsString($item, file_get_contents($this->logFilePath));
120+
}
121+
122+
$this->publishMessage('message4');
123+
$this->waitForAsynchronousResult(count($this->messages) + 1, $this->logFilePath);
124+
$this->assertStringContainsString('message4', file_get_contents($this->logFilePath));
125+
}
126+
127+
/**
128+
* Check if consumers do not wait for messages from the queue and die
129+
*/
130+
public function testNotWaitForMessages(): void
131+
{
132+
if ($this->connectionType !== 'stomp') {
133+
$this->markTestSkipped('STOMP test skipped because AMQP connection is available.
134+
This test is STOMP-specific.');
135+
}
136+
137+
$this->publisherConsumerController->stopConsumers();
138+
139+
$config = $this->config;
140+
$config['queue']['consumers_wait_for_messages'] = 0;
141+
$this->writeConfig($config);
142+
143+
$loadedConfig = $this->loadConfig();
144+
$this->assertArrayHasKey('queue', $loadedConfig);
145+
$this->assertArrayHasKey('consumers_wait_for_messages', $loadedConfig['queue']);
146+
$this->assertEquals(0, $loadedConfig['queue']['consumers_wait_for_messages']);
147+
foreach ($this->messages as $message) {
148+
$this->publishMessage($message);
149+
}
150+
151+
$this->publisherConsumerController->startConsumers();
152+
sleep(5);
153+
$this->waitForAsynchronousResult(count($this->messages), $this->logFilePath);
154+
155+
foreach ($this->messages as $item) {
156+
$this->assertStringContainsString($item, file_get_contents($this->logFilePath));
157+
}
158+
159+
// Checks that consumers do not wait 4th message and die
160+
$consumersProcessIds = $this->publisherConsumerController->getConsumersProcessIds();
161+
$this->assertArrayHasKey('stomp.mixed.sync.and.async.queue.consumer', $consumersProcessIds);
162+
$this->assertEquals([], $consumersProcessIds['stomp.mixed.sync.and.async.queue.consumer']);
163+
}
164+
165+
/**
166+
* @param string $message
167+
*/
168+
private function publishMessage(string $message): void
169+
{
170+
$this->getMsgObject()->setValue($message);
171+
$this->getMsgObject()->setTextFilePath($this->logFilePath);
172+
$this->publisher->publish('stomp.multi.topic.queue.topic.c', $this->getMsgObject());
173+
}
174+
175+
/**
176+
* @return array
177+
*/
178+
private function loadConfig(): array
179+
{
180+
return $this->reader->load(ConfigFilePool::APP_ENV);
181+
}
182+
183+
/**
184+
* @param array $config
185+
*/
186+
private function writeConfig(array $config): void
187+
{
188+
$writer = $this->objectManager->get(Writer::class);
189+
$writer->saveConfig([ConfigFilePool::APP_ENV => $config], true);
190+
}
191+
192+
/**
193+
* @inheritdoc
194+
*/
195+
protected function tearDown(): void
196+
{
197+
parent::tearDown();
198+
$this->writeConfig($this->config);
199+
}
200+
}

0 commit comments

Comments
 (0)