Skip to content

Commit 106a392

Browse files
committed
Merge branch 'master' into 3.2-merge
2 parents 32966f9 + aab0415 commit 106a392

File tree

8 files changed

+136
-22
lines changed

8 files changed

+136
-22
lines changed

src/AnnotationJob.php

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,10 @@
1818

1919
class AnnotationJob extends Job
2020
{
21-
public string $class;
22-
23-
public string $method;
24-
2521
public array $params = [];
2622

27-
public function __construct(string $class, string $method, array $params, int $maxAttempts = 0)
23+
public function __construct(public string $class, public string $method, array $params, public int $maxAttempts = 0)
2824
{
29-
$this->class = $class;
30-
$this->method = $method;
31-
$this->maxAttempts = $maxAttempts;
3225
foreach ($params as $key => $value) {
3326
if ($value instanceof CompressInterface) {
3427
$value = $value->compress();
@@ -40,10 +33,9 @@ public function __construct(string $class, string $method, array $params, int $m
4033
public function handle()
4134
{
4235
$container = ApplicationContext::getContainer();
43-
44-
$class = $container->get($this->class);
45-
36+
$instance = $container->get($this->class);
4637
$params = [];
38+
4739
foreach ($this->params as $key => $value) {
4840
if ($value instanceof UnCompressInterface) {
4941
$value = $value->uncompress();
@@ -53,6 +45,8 @@ public function handle()
5345

5446
$container->get(Environment::class)->setAsyncQueue(true);
5547

56-
$class->{$this->method}(...$params);
48+
$method = $this->method;
49+
50+
(fn () => $this->{$method}(...$params))->call($instance);
5751
}
5852
}

src/Driver/Driver.php

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Hyperf\AsyncQueue\Event\QueueLength;
1919
use Hyperf\AsyncQueue\Event\RetryHandle;
2020
use Hyperf\AsyncQueue\MessageInterface;
21+
use Hyperf\AsyncQueue\Result;
2122
use Hyperf\Codec\Packer\PhpSerializerPacker;
2223
use Hyperf\Collection\Arr;
2324
use Hyperf\Contract\PackerInterface;
@@ -104,19 +105,31 @@ protected function checkQueueLength(): void
104105

105106
/**
106107
* @param mixed $data
107-
* @param MessageInterface $message
108+
* @param MessageInterface|mixed $message
108109
*/
109110
protected function getCallback($data, $message): callable
110111
{
111112
return function () use ($data, $message) {
112113
try {
113-
if ($message instanceof MessageInterface) {
114-
$this->event?->dispatch(new BeforeHandle($message));
115-
$message->job()->handle();
116-
$this->event?->dispatch(new AfterHandle($message));
114+
// If the message is invalid, just ack it.
115+
if (! $message instanceof MessageInterface) {
116+
$this->ack($data);
117+
return;
117118
}
118119

119-
$this->ack($data);
120+
$this->event?->dispatch(new BeforeHandle($message));
121+
122+
$result = $message->job()->handle();
123+
$result = $result instanceof Result ? $result : Result::ACK;
124+
125+
match ($result) {
126+
Result::REQUEUE => $this->remove($data) && $this->retry($data),
127+
Result::RETRY => $this->remove($data) && $message->attempts() && $this->retry($message),
128+
Result::DROP => $this->remove($data),
129+
Result::ACK => $this->ack($data),
130+
};
131+
132+
$this->event?->dispatch(new AfterHandle($message, $result));
120133
} catch (Throwable $ex) {
121134
if (isset($message, $data)) {
122135
if ($message->attempts() && $this->remove($data)) {

src/Event/AfterHandle.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@
1212

1313
namespace Hyperf\AsyncQueue\Event;
1414

15+
use Hyperf\AsyncQueue\MessageInterface;
16+
use Hyperf\AsyncQueue\Result;
17+
1518
class AfterHandle extends Event
1619
{
20+
public function __construct(MessageInterface $message, public Result $result)
21+
{
22+
parent::__construct($message);
23+
}
1724
}

src/JobInterface.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public function fail(Throwable $e): void;
2020

2121
/**
2222
* Handle the job.
23+
* @return mixed|never|Result
2324
*/
2425
public function handle();
2526

src/Process/ConsumerProcess.php

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@
1414

1515
use Hyperf\AsyncQueue\Driver\DriverFactory;
1616
use Hyperf\AsyncQueue\Driver\DriverInterface;
17+
use Hyperf\Contract\StdoutLoggerInterface;
1718
use Hyperf\Process\AbstractProcess;
1819
use Psr\Container\ContainerInterface;
1920

2021
class ConsumerProcess extends AbstractProcess
2122
{
22-
protected string $queue = 'default';
23+
protected string $pool = 'default';
2324

2425
protected DriverInterface $driver;
2526

@@ -29,11 +30,19 @@ public function __construct(ContainerInterface $container)
2930
{
3031
parent::__construct($container);
3132

33+
// compatible with older versions, will be removed in v3.2, use `$pool` instead.
34+
if (property_exists($this, 'queue')) {
35+
if ($container->has(StdoutLoggerInterface::class)) {
36+
$container->get(StdoutLoggerInterface::class)->warning(sprintf('The property "%s::$queue" is deprecated since v3.1 and will be removed in v3.2, use "%s::$pool" instead.', self::class, self::class));
37+
}
38+
$this->pool = $this->queue;
39+
}
40+
3241
$factory = $this->container->get(DriverFactory::class);
33-
$this->driver = $factory->get($this->queue);
34-
$this->config = $factory->getConfig($this->queue);
42+
$this->driver = $factory->get($this->pool);
43+
$this->config = $factory->getConfig($this->pool);
3544

36-
$this->name = "queue.{$this->queue}";
45+
$this->name = "queue.{$this->pool}";
3746
$this->nums = $this->config['processes'] ?? 1;
3847
}
3948

src/Result.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
13+
namespace Hyperf\AsyncQueue;
14+
15+
enum Result: string
16+
{
17+
/**
18+
* Acknowledge the message.
19+
*/
20+
case ACK = 'ack';
21+
22+
/**
23+
* Reject the message and requeue it.
24+
*/
25+
case REQUEUE = 'requeue';
26+
27+
/**
28+
* Retry the message.
29+
*/
30+
case RETRY = 'retry';
31+
32+
/**
33+
* Reject the message and drop it.
34+
*/
35+
case DROP = 'drop';
36+
}

tests/AsyncQueueAspectTest.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,21 @@
2020
use Hyperf\AsyncQueue\Environment;
2121
use Hyperf\Context\ApplicationContext;
2222
use Hyperf\Context\Context;
23+
use Hyperf\Coroutine\Waiter;
2324
use Hyperf\Di\Annotation\AnnotationCollector;
2425
use Hyperf\Di\Annotation\Aspect;
2526
use Hyperf\Di\Aop\Ast;
2627
use Hyperf\Di\ReflectionManager;
2728
use HyperfTest\AsyncQueue\Stub\FooProxy;
29+
use HyperfTest\AsyncQueue\Stub\FooService;
2830
use Mockery;
2931
use PHPUnit\Framework\Attributes\CoversNothing;
3032
use PHPUnit\Framework\Attributes\Group;
3133
use PHPUnit\Framework\TestCase;
3234
use Psr\Container\ContainerInterface;
3335

36+
use function Hyperf\Coroutine\wait;
37+
3438
/**
3539
* @internal
3640
* @coversNothing
@@ -84,6 +88,28 @@ public function testAsyncMessageVariadic()
8488
$this->assertSame([$id, $uuid, $data], Context::get(FooProxy::class));
8589
}
8690

91+
public function testAnnotationJob()
92+
{
93+
$container = Mockery::mock(ContainerInterface::class);
94+
ApplicationContext::setContainer($container);
95+
$container->shouldReceive('get')->with(Waiter::class)->andReturn(new Waiter());
96+
97+
wait(function () use ($container) {
98+
$container->shouldReceive('get')->with(FooService::class)->andReturn(new FooService());
99+
$container->shouldReceive('get')->with(Environment::class)->andReturn(new Environment());
100+
101+
$job = new AnnotationJob(FooService::class, 'test', []);
102+
$job->handle();
103+
104+
$this->assertSame(1, Context::get(FooService::class . '::test'));
105+
106+
$job = new AnnotationJob(FooService::class, 'foo', []);
107+
$job->handle();
108+
109+
$this->assertSame(1, Context::get(FooService::class . '::foo'));
110+
});
111+
}
112+
87113
protected function getContainer()
88114
{
89115
$container = Mockery::mock(ContainerInterface::class);

tests/Stub/FooService.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
13+
namespace HyperfTest\AsyncQueue\Stub;
14+
15+
use Hyperf\Context\Context;
16+
17+
class FooService
18+
{
19+
public function test()
20+
{
21+
Context::set(self::class . '::test', 1);
22+
}
23+
24+
protected function foo()
25+
{
26+
Context::set(self::class . '::foo', 1);
27+
}
28+
}

0 commit comments

Comments
 (0)