Skip to content

Commit 7852266

Browse files
committed
Added argument queue for method flush and reload.
1 parent 9625a93 commit 7852266

File tree

6 files changed

+59
-8
lines changed

6 files changed

+59
-8
lines changed

src/Command/FlushFailedMessageCommand.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Console\Command\Command as SymfonyCommand;
1919
use Symfony\Component\Console\Input\InputArgument;
2020
use Symfony\Component\Console\Input\InputInterface;
21+
use Symfony\Component\Console\Input\InputOption;
2122
use Symfony\Component\Console\Output\OutputInterface;
2223

2324
/**
@@ -39,10 +40,12 @@ public function __construct(ContainerInterface $container)
3940
public function execute(InputInterface $input, OutputInterface $output)
4041
{
4142
$name = $input->getArgument('name');
43+
$queue = $input->getOption('queue');
44+
4245
$factory = $this->container->get(DriverFactory::class);
4346
$driver = $factory->get($name);
4447

45-
$driver->flush();
48+
$driver->flush($queue);
4649

4750
$output->writeln('<fg=red>Flush all message from failed queue.</>');
4851
}
@@ -51,5 +54,6 @@ protected function configure()
5154
{
5255
$this->setDescription('Delete all message from failed queue.');
5356
$this->addArgument('name', InputArgument::OPTIONAL, 'The name of queue.', 'default');
57+
$this->addOption('queue', 'Q', InputOption::VALUE_OPTIONAL, 'The channel name of queue.');
5458
}
5559
}

src/Command/ReloadFailedMessageCommand.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Console\Command\Command as SymfonyCommand;
1919
use Symfony\Component\Console\Input\InputArgument;
2020
use Symfony\Component\Console\Input\InputInterface;
21+
use Symfony\Component\Console\Input\InputOption;
2122
use Symfony\Component\Console\Output\OutputInterface;
2223

2324
/**
@@ -39,10 +40,12 @@ public function __construct(ContainerInterface $container)
3940
public function execute(InputInterface $input, OutputInterface $output)
4041
{
4142
$name = $input->getArgument('name');
43+
$queue = $input->getOption('queue');
44+
4245
$factory = $this->container->get(DriverFactory::class);
4346
$driver = $factory->get($name);
4447

45-
$num = $driver->reload();
48+
$num = $driver->reload($queue);
4649

4750
$output->writeln(sprintf('<fg=green>Reload %d failed message into waiting queue.</>', $num));
4851
}
@@ -51,5 +54,6 @@ protected function configure()
5154
{
5255
$this->setDescription('Reload all failed message into waiting queue.');
5356
$this->addArgument('name', InputArgument::OPTIONAL, 'The name of queue.', 'default');
57+
$this->addOption('queue', 'Q', InputOption::VALUE_OPTIONAL, 'The channel name of queue.');
5458
}
5559
}

src/Driver/ChannelConfig.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
namespace Hyperf\AsyncQueue\Driver;
1414

15+
use Hyperf\AsyncQueue\Exception\InvalidQueueException;
16+
1517
class ChannelConfig
1618
{
1719
/**
@@ -64,6 +66,15 @@ public function __construct(string $channel)
6466
$this->timeout = "{$channel}:timeout";
6567
}
6668

69+
public function get(string $queue)
70+
{
71+
if (isset($this->{$queue}) && is_string($this->{$queue})) {
72+
return $this->{$queue};
73+
}
74+
75+
throw new InvalidQueueException(sprintf('Queue %s is not exist.', $queue));
76+
}
77+
6778
/**
6879
* @return string
6980
*/

src/Driver/DriverInterface.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ public function consume(): void;
5353
/**
5454
* Reload failed message into waiting queue.
5555
*/
56-
public function reload(): int;
56+
public function reload(string $queue = null): int;
5757

5858
/**
5959
* Delete all failed message from failed queue.
6060
*/
61-
public function flush(): bool;
61+
public function flush(string $queue = null): bool;
6262

6363
/**
6464
* Return info for current queue.

src/Driver/RedisDriver.php

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
namespace Hyperf\AsyncQueue\Driver;
1414

15+
use Hyperf\AsyncQueue\Exception\InvalidQueueException;
1516
use Hyperf\AsyncQueue\JobInterface;
1617
use Hyperf\AsyncQueue\Message;
1718
use Hyperf\AsyncQueue\MessageInterface;
@@ -113,18 +114,32 @@ public function fail($data): bool
113114
return false;
114115
}
115116

116-
public function reload(): int
117+
public function reload(string $queue = null): int
117118
{
119+
$channel = $this->channel->getFailed();
120+
if ($queue) {
121+
if (! in_array($queue, ['timeout', 'failed'])) {
122+
throw new InvalidQueueException(sprintf('Queue %s is not supported.', $queue));
123+
}
124+
125+
$channel = $this->channel->get($queue);
126+
}
127+
118128
$num = 0;
119-
while ($this->redis->rpoplpush($this->channel->getFailed(), $this->channel->getWaiting())) {
129+
while ($this->redis->rpoplpush($channel, $this->channel->getWaiting())) {
120130
++$num;
121131
}
122132
return $num;
123133
}
124134

125-
public function flush(): bool
135+
public function flush(string $queue = null): bool
126136
{
127-
return (bool) $this->redis->delete($this->channel->getFailed());
137+
$channel = $this->channel->getFailed();
138+
if ($queue) {
139+
$channel = $this->channel->get($queue);
140+
}
141+
142+
return (bool) $this->redis->delete($channel);
128143
}
129144

130145
public function info(): array
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://hyperf.io
8+
* @document https://doc.hyperf.io
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
11+
*/
12+
13+
namespace Hyperf\AsyncQueue\Exception;
14+
15+
class InvalidQueueException extends \RuntimeException
16+
{
17+
}

0 commit comments

Comments
 (0)