Skip to content

Commit a67b73d

Browse files
authored
Added ReloadChannelListener to reload timeout or failed channels automatically. (#2459)
* Added `ReloadChannelListener` to reload timeout or failed channels automatically. * Added test cases.
1 parent 42af270 commit a67b73d

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
namespace Hyperf\AsyncQueue\Listener;
13+
14+
use Hyperf\AsyncQueue\Event\QueueLength;
15+
use Hyperf\Contract\StdoutLoggerInterface;
16+
use Hyperf\Event\Contract\ListenerInterface;
17+
18+
class ReloadChannelListener implements ListenerInterface
19+
{
20+
/**
21+
* @var StdoutLoggerInterface
22+
*/
23+
protected $logger;
24+
25+
/**
26+
* @var string[]
27+
*/
28+
protected $channels = [
29+
'timeout',
30+
];
31+
32+
public function __construct(StdoutLoggerInterface $logger)
33+
{
34+
$this->logger = $logger;
35+
}
36+
37+
public function listen(): array
38+
{
39+
return [
40+
QueueLength::class,
41+
];
42+
}
43+
44+
/**
45+
* @param QueueLength $event
46+
*/
47+
public function process(object $event)
48+
{
49+
if (! $event instanceof QueueLength) {
50+
return;
51+
}
52+
53+
if (! in_array($event->key, $this->channels)) {
54+
return;
55+
}
56+
57+
if ($event->length == 0) {
58+
return;
59+
}
60+
61+
$event->driver->reload($event->key);
62+
63+
$this->logger->info(sprintf('%s channel reload %d messages to waiting channel success.', $event->key, $event->length));
64+
}
65+
}

tests/ListenerTest.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
namespace HyperfTest\AsyncQueue;
13+
14+
use Hyperf\AsyncQueue\Driver\DriverInterface;
15+
use Hyperf\AsyncQueue\Event\QueueLength;
16+
use Hyperf\AsyncQueue\Listener\ReloadChannelListener;
17+
use Hyperf\Contract\StdoutLoggerInterface;
18+
use Mockery;
19+
use PHPUnit\Framework\TestCase;
20+
21+
/**
22+
* @internal
23+
* @coversNothing
24+
*/
25+
class ListenerTest extends TestCase
26+
{
27+
protected function tearDown()
28+
{
29+
Mockery::close();
30+
}
31+
32+
public function testReloadChannelListener()
33+
{
34+
$logger = Mockery::mock(StdoutLoggerInterface::class);
35+
$logger->shouldReceive('info')->withAnyArgs()->once()->andReturnUsing(function ($message) {
36+
$this->assertSame('timeout channel reload 10 messages to waiting channel success.', $message);
37+
});
38+
$driver = Mockery::mock(DriverInterface::class);
39+
$driver->shouldReceive('reload')->withAnyArgs()->andReturn(0);
40+
$listener = new ReloadChannelListener($logger);
41+
$listener->process(new QueueLength($driver, 'failed', 10));
42+
$listener->process(new QueueLength($driver, 'timeout', 10));
43+
$listener->process(new QueueLength($driver, 'timeout', 0));
44+
}
45+
}

0 commit comments

Comments
 (0)