Skip to content

Commit f94bfd8

Browse files
authored
feat(async-queue): add automatic consumer process registration based on configuration (#7618)
1 parent 106a392 commit f94bfd8

File tree

4 files changed

+106
-0
lines changed

4 files changed

+106
-0
lines changed

publish/async_queue.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
return [
1515
'default' => [
16+
'enable' => true, // Whether to enable auto register consumer process.
1617
'driver' => RedisDriver::class,
1718
'redis' => [
1819
'pool' => 'default',

src/ConfigProvider.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ public function __invoke(): array
3232
ReloadFailedMessageCommand::class,
3333
DynamicReloadMessageCommand::class,
3434
],
35+
'listeners' => [
36+
Listener\RegisterConsumerProcessesListener::class,
37+
],
3538
'publish' => [
3639
[
3740
'id' => 'config',

src/ConsumerManager.php

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
13+
namespace Hyperf\AsyncQueue;
14+
15+
use Hyperf\AsyncQueue\Driver\DriverFactory;
16+
use Hyperf\Contract\ConfigInterface;
17+
use Hyperf\Process\AbstractProcess;
18+
use Hyperf\Process\ProcessManager;
19+
use Psr\Container\ContainerInterface;
20+
21+
class ConsumerManager
22+
{
23+
public function __construct(protected ContainerInterface $container)
24+
{
25+
}
26+
27+
public function run(): void
28+
{
29+
$config = $this->container->get(ConfigInterface::class);
30+
$pools = $config->get('async_queue', []);
31+
32+
foreach ($pools as $pool => $config) {
33+
if (! ($config['enable'] ?? false)) {
34+
continue;
35+
}
36+
37+
$this->createProcess($pool, $config);
38+
}
39+
}
40+
41+
protected function createProcess(string $pool, array $config): void
42+
{
43+
$process = new class($this->container, $pool, $config) extends AbstractProcess {
44+
public function __construct(
45+
protected ContainerInterface $container,
46+
protected string $pool,
47+
array $config
48+
) {
49+
parent::__construct($container);
50+
$this->name = "queue.{$pool}";
51+
$this->nums = $config['processes'] ?? 1;
52+
}
53+
54+
public function handle(): void
55+
{
56+
$driver = $this->container->get(DriverFactory::class)->get($this->pool);
57+
$driver->consume();
58+
}
59+
};
60+
61+
ProcessManager::register($process);
62+
}
63+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
13+
namespace Hyperf\AsyncQueue\Listener;
14+
15+
use Hyperf\AsyncQueue\ConsumerManager;
16+
use Hyperf\Event\Contract\ListenerInterface;
17+
use Hyperf\Framework\Event\BeforeMainServerStart;
18+
use Hyperf\Server\Event\MainCoroutineServerStart;
19+
use Psr\Container\ContainerInterface;
20+
21+
class RegisterConsumerProcessesListener implements ListenerInterface
22+
{
23+
public function __construct(protected ContainerInterface $container)
24+
{
25+
}
26+
27+
public function listen(): array
28+
{
29+
return [
30+
BeforeMainServerStart::class,
31+
MainCoroutineServerStart::class,
32+
];
33+
}
34+
35+
public function process(object $event): void
36+
{
37+
$this->container->get(ConsumerManager::class)->run();
38+
}
39+
}

0 commit comments

Comments
 (0)