Skip to content

Commit e8f071c

Browse files
authored
Added command queue:dynamic-reload. (#6979)
1 parent fc0f6fa commit e8f071c

File tree

2 files changed

+155
-0
lines changed

2 files changed

+155
-0
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
13+
namespace Hyperf\AsyncQueue\Command;
14+
15+
use Hyperf\AsyncQueue\AnnotationJob;
16+
use Hyperf\AsyncQueue\Driver\ChannelConfig;
17+
use Hyperf\AsyncQueue\Driver\DriverFactory;
18+
use Hyperf\AsyncQueue\Driver\RedisDriver;
19+
use Hyperf\AsyncQueue\JobInterface;
20+
use Hyperf\AsyncQueue\JobMessage;
21+
use Hyperf\Codec\Json;
22+
use Hyperf\Command\Command as HyperfCommand;
23+
use Hyperf\Redis\RedisProxy;
24+
use Hyperf\Stringable\Str;
25+
use Hyperf\Support\Reflection\ClassInvoker;
26+
use Psr\Container\ContainerInterface;
27+
use Symfony\Component\Console\Input\InputArgument;
28+
use Symfony\Component\Console\Input\InputOption;
29+
30+
class DynamicReloadMessageCommand extends HyperfCommand
31+
{
32+
protected ContainerInterface $container;
33+
34+
public function __construct(ContainerInterface $container)
35+
{
36+
$this->container = $container;
37+
parent::__construct('queue:dynamic-reload');
38+
}
39+
40+
public function handle()
41+
{
42+
$name = $this->input->getArgument('name');
43+
$queue = $this->input->getOption('queue');
44+
$job = $this->input->getOption('job');
45+
$limit = (int) $this->input->getOption('limit');
46+
$reload = $this->input->getOption('reload');
47+
48+
$factory = $this->container->get(DriverFactory::class);
49+
$driver = $factory->get($name);
50+
if (! $driver instanceof RedisDriver) {
51+
$this->error("Don't support driver " . $driver::class);
52+
return 0;
53+
}
54+
55+
$ref = new ClassInvoker($driver);
56+
/** @phpstan-ignore-next-line */
57+
$redis = $ref->redis;
58+
/** @phpstan-ignore-next-line */
59+
$channel = $ref->channel;
60+
61+
if (! $reload) {
62+
$this->show($channel, $redis, $queue, $limit, $job);
63+
return 0;
64+
}
65+
66+
$this->reload($channel, $redis, $queue, $limit, $job);
67+
}
68+
69+
public function reload(ChannelConfig $channel, RedisProxy $redis, string $queue, int $limit, ?string $jobName = null): void
70+
{
71+
$index = 0;
72+
$key = $channel->get($queue);
73+
if (! $limit) {
74+
$limit = (int) $redis->llen($key);
75+
}
76+
77+
while (true) {
78+
$data = $redis->rPop($key);
79+
++$index;
80+
if (! $data) {
81+
break;
82+
}
83+
84+
/** @var JobMessage $jobMessage */
85+
$jobMessage = unserialize($data);
86+
$job = $jobMessage->job();
87+
88+
if ($job instanceof AnnotationJob) {
89+
$name = $job->class . '::' . $job->method;
90+
} else {
91+
$name = $job::class;
92+
}
93+
94+
if ($jobName === null || $name === $jobName) {
95+
$redis->lPush($channel->getWaiting(), $data);
96+
$this->output->writeln('Reload Job: ' . $name);
97+
} else {
98+
$redis->lPush($key, $data);
99+
$this->output->writeln('RePush Job: ' . $name);
100+
}
101+
102+
if ($index >= $limit) {
103+
return;
104+
}
105+
}
106+
}
107+
108+
public function show(ChannelConfig $channel, RedisProxy $redis, string $queue, int $limit, ?string $jobName = null)
109+
{
110+
$key = $channel->get($queue);
111+
$index = 0;
112+
while (true) {
113+
$data = $redis->lIndex($key, $index);
114+
++$index;
115+
if (! $data) {
116+
break;
117+
}
118+
119+
/** @var JobMessage $jobMessage */
120+
$jobMessage = unserialize($data);
121+
/** @var AnnotationJob|JobInterface $job */
122+
$job = $jobMessage->job();
123+
if ($job instanceof AnnotationJob) {
124+
$name = $job->class . '::' . $job->method;
125+
$params = Json::encode($job->params);
126+
} else {
127+
$name = $job::class;
128+
$params = Json::encode(get_object_vars($job));
129+
}
130+
131+
if (! $jobName || $jobName === $name) {
132+
$this->output->writeln('Job: ' . $name . ' [' . Str::limit($params, 1000) . ']');
133+
}
134+
135+
if ($limit > 0 && $index >= $limit) {
136+
return;
137+
}
138+
}
139+
}
140+
141+
protected function configure()
142+
{
143+
$this->setDescription('Reload all failed message into waiting queue.');
144+
$this->addArgument('name', InputArgument::OPTIONAL, 'The name of queue.', 'default');
145+
$this->addOption('queue', 'Q', InputOption::VALUE_OPTIONAL, 'The channel name of queue.', 'failed');
146+
$jobHelp = 'If you use job which implements JobInterface, you can input class name like `App\Job\FooJob`' . PHP_EOL;
147+
$jobHelp .= 'If you use annotation `Hyperf\AsyncQueue\Annotation\AsyncQueueMessage`, you can input `class::method` like `App\Service\FooService::handleJob`' . PHP_EOL;
148+
$jobHelp .= 'If you don\'t input job, the command only show the messages.';
149+
$this->addOption('job', 'J', InputOption::VALUE_OPTIONAL, 'The job name which will be reloaded to queue. ' . PHP_EOL . $jobHelp);
150+
$this->addOption('limit', 'L', InputOption::VALUE_OPTIONAL, 'The number of retrieved messages.');
151+
$this->addOption('reload', 'R', InputOption::VALUE_NONE, 'Whether to reload the message queue.');
152+
}
153+
}

src/ConfigProvider.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
namespace Hyperf\AsyncQueue;
1414

1515
use Hyperf\AsyncQueue\Aspect\AsyncQueueAspect;
16+
use Hyperf\AsyncQueue\Command\DynamicReloadMessageCommand;
1617
use Hyperf\AsyncQueue\Command\FlushFailedMessageCommand;
1718
use Hyperf\AsyncQueue\Command\InfoCommand;
1819
use Hyperf\AsyncQueue\Command\ReloadFailedMessageCommand;
@@ -29,6 +30,7 @@ public function __invoke(): array
2930
FlushFailedMessageCommand::class,
3031
InfoCommand::class,
3132
ReloadFailedMessageCommand::class,
33+
DynamicReloadMessageCommand::class,
3234
],
3335
'publish' => [
3436
[

0 commit comments

Comments
 (0)