Skip to content

Commit 9345e46

Browse files
authored
Added result handling for async queue jobs with Hyperf\AsyncQueue\Result enum.
1 parent f059f72 commit 9345e46

File tree

3 files changed

+52
-3
lines changed

3 files changed

+52
-3
lines changed

src/Driver/Driver.php

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Hyperf\AsyncQueue\Event\QueueLength;
1919
use Hyperf\AsyncQueue\Event\RetryHandle;
2020
use Hyperf\AsyncQueue\MessageInterface;
21+
use Hyperf\AsyncQueue\Result;
2122
use Hyperf\Codec\Packer\PhpSerializerPacker;
2223
use Hyperf\Collection\Arr;
2324
use Hyperf\Contract\PackerInterface;
@@ -112,11 +113,22 @@ protected function getCallback($data, $message): callable
112113
try {
113114
if ($message instanceof MessageInterface) {
114115
$this->event?->dispatch(new BeforeHandle($message));
115-
$message->job()->handle();
116+
117+
$result = $message->job()->handle();
118+
119+
match ($result) {
120+
Result::REQUEUE => $this->remove($data) && $this->retry($data),
121+
Result::RETRY => $this->remove($data) && $message->attempts() && $this->retry($message),
122+
Result::DROP => $this->remove($data),
123+
Result::ACK => $this->ack($data),
124+
default => $this->ack($data),
125+
};
126+
116127
$this->event?->dispatch(new AfterHandle($message));
128+
} else {
129+
// If the message is invalid, just ack it.
130+
$this->ack($data);
117131
}
118-
119-
$this->ack($data);
120132
} catch (Throwable $ex) {
121133
if (isset($message, $data)) {
122134
if ($message->attempts() && $this->remove($data)) {

src/JobInterface.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public function fail(Throwable $e): void;
2020

2121
/**
2222
* Handle the job.
23+
* @return mixed|never|Result
2324
*/
2425
public function handle();
2526

src/Result.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
13+
namespace Hyperf\AsyncQueue;
14+
15+
enum Result: string
16+
{
17+
/**
18+
* Acknowledge the message.
19+
*/
20+
case ACK = 'ack';
21+
22+
/**
23+
* Reject the message and requeue it.
24+
*/
25+
case REQUEUE = 'requeue';
26+
27+
/**
28+
* Retry the message.
29+
*/
30+
case RETRY = 'retry';
31+
32+
/**
33+
* Reject the message and drop it.
34+
*/
35+
case DROP = 'drop';
36+
}

0 commit comments

Comments
 (0)