Skip to content

Commit 22a258e

Browse files
authored
Added QueueHandleListener which can record running logs for async-queue. (#2671)
* Added QueueHandleListener for async-queue * Update Client.php * update
1 parent 402abd7 commit 22a258e

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"suggest": {
3333
"hyperf/di": "Required to use annotations.",
3434
"hyperf/event": "Required to dispatch a event.",
35+
"hyperf/logger": "Required to use QueueHandleListener.",
3536
"hyperf/process": "Auto register the consumer process for server."
3637
},
3738
"autoload": {
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
namespace Hyperf\AsyncQueue\Listener;
13+
14+
use Hyperf\AsyncQueue\AnnotationJob;
15+
use Hyperf\AsyncQueue\Event\AfterHandle;
16+
use Hyperf\AsyncQueue\Event\BeforeHandle;
17+
use Hyperf\AsyncQueue\Event\Event;
18+
use Hyperf\AsyncQueue\Event\FailedHandle;
19+
use Hyperf\AsyncQueue\Event\RetryHandle;
20+
use Hyperf\Event\Contract\ListenerInterface;
21+
use Hyperf\Logger\LoggerFactory;
22+
use Psr\Container\ContainerInterface;
23+
use Psr\Log\LoggerInterface;
24+
25+
class QueueHandleListener implements ListenerInterface
26+
{
27+
/**
28+
* @var LoggerInterface
29+
*/
30+
protected $logger;
31+
32+
public function __construct(ContainerInterface $container)
33+
{
34+
$this->logger = $container->get(LoggerFactory::class)->get('queue');
35+
}
36+
37+
public function listen(): array
38+
{
39+
return [
40+
AfterHandle::class,
41+
BeforeHandle::class,
42+
FailedHandle::class,
43+
RetryHandle::class,
44+
];
45+
}
46+
47+
public function process(object $event)
48+
{
49+
if ($event instanceof Event && $event->message->job()) {
50+
$job = $event->message->job();
51+
$jobClass = get_class($job);
52+
if ($job instanceof AnnotationJob) {
53+
$jobClass = sprintf('Job[%s@%s]', $job->class, $job->method);
54+
}
55+
$date = date('Y-m-d H:i:s');
56+
57+
switch (true) {
58+
case $event instanceof BeforeHandle:
59+
$this->logger->info(sprintf('[%s] Processing %s.', $date, $jobClass));
60+
break;
61+
case $event instanceof AfterHandle:
62+
$this->logger->info(sprintf('[%s] Processed %s.', $date, $jobClass));
63+
break;
64+
case $event instanceof FailedHandle:
65+
$this->logger->error(sprintf('[%s] Failed %s.', $date, $jobClass));
66+
$this->logger->error((string) $event->getThrowable());
67+
break;
68+
case $event instanceof RetryHandle:
69+
$this->logger->warning(sprintf('[%s] Retried %s.', $date, $jobClass));
70+
break;
71+
}
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)