Skip to content

Commit f2cf1dd

Browse files
authored
Fixed deprecated for Hyperf\AsyncQueue\Message with Serializable. (#4339)
1 parent 17d9a69 commit f2cf1dd

File tree

6 files changed

+254
-5
lines changed

6 files changed

+254
-5
lines changed

src/Driver/RedisDriver.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Hyperf\AsyncQueue\Exception\InvalidQueueException;
1515
use Hyperf\AsyncQueue\JobInterface;
16+
use Hyperf\AsyncQueue\JobMessage;
1617
use Hyperf\AsyncQueue\Message;
1718
use Hyperf\AsyncQueue\MessageInterface;
1819
use Hyperf\Redis\RedisFactory;
@@ -62,7 +63,7 @@ public function __construct(ContainerInterface $container, $config)
6263

6364
public function push(JobInterface $job, int $delay = 0): bool
6465
{
65-
$message = make(Message::class, [$job]);
66+
$message = make(JobMessage::class, [$job]);
6667
$data = $this->packer->pack($message);
6768

6869
if ($delay === 0) {
@@ -74,7 +75,7 @@ public function push(JobInterface $job, int $delay = 0): bool
7475

7576
public function delete(JobInterface $job): bool
7677
{
77-
$message = make(Message::class, [$job]);
78+
$message = make(JobMessage::class, [$job]);
7879
$data = $this->packer->pack($message);
7980

8081
return (bool) $this->redis->zRem($this->channel->getDelayed(), $data);

src/JobMessage.php

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
namespace Hyperf\AsyncQueue;
13+
14+
use Hyperf\Contract\CompressInterface;
15+
use Hyperf\Contract\UnCompressInterface;
16+
17+
class JobMessage implements MessageInterface
18+
{
19+
protected int $attempts = 0;
20+
21+
public function __construct(protected JobInterface $job)
22+
{
23+
}
24+
25+
public function __serialize(): array
26+
{
27+
if ($this->job instanceof CompressInterface) {
28+
/* @phpstan-ignore-next-line */
29+
$this->job = $this->job->compress();
30+
}
31+
32+
return [$this->job, $this->attempts];
33+
}
34+
35+
public function __unserialize(array $data): void
36+
{
37+
[$job, $attempts] = $data;
38+
if ($job instanceof UnCompressInterface) {
39+
$job = $job->uncompress();
40+
}
41+
42+
$this->job = $job;
43+
$this->attempts = $attempts;
44+
}
45+
46+
public function job(): JobInterface
47+
{
48+
return $this->job;
49+
}
50+
51+
public function attempts(): bool
52+
{
53+
if ($this->job->getMaxAttempts() > $this->attempts++) {
54+
return true;
55+
}
56+
return false;
57+
}
58+
59+
public function getAttempts(): int
60+
{
61+
return $this->attempts;
62+
}
63+
}

src/Message.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
use Hyperf\Contract\UnCompressInterface;
1616
use Serializable;
1717

18+
/**
19+
* @deprecated in v3.1, please use JobMessage instead
20+
*/
1821
class Message implements MessageInterface, Serializable
1922
{
2023
protected int $attempts = 0;
@@ -23,6 +26,27 @@ public function __construct(protected JobInterface $job)
2326
{
2427
}
2528

29+
public function __serialize(): array
30+
{
31+
if ($this->job instanceof CompressInterface) {
32+
/* @phpstan-ignore-next-line */
33+
$this->job = $this->job->compress();
34+
}
35+
36+
return [$this->job, $this->attempts];
37+
}
38+
39+
public function __unserialize(array $data): void
40+
{
41+
[$job, $attempts] = $data;
42+
if ($job instanceof UnCompressInterface) {
43+
$job = $job->uncompress();
44+
}
45+
46+
$this->job = $job;
47+
$this->attempts = $attempts;
48+
}
49+
2650
public function job(): JobInterface
2751
{
2852
return $this->job;

tests/MessageTest.php

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
namespace HyperfTest\AsyncQueue;
13+
14+
use Hyperf\AsyncQueue\JobInterface;
15+
use Hyperf\AsyncQueue\JobMessage;
16+
use Hyperf\AsyncQueue\MessageInterface;
17+
use HyperfTest\AsyncQueue\Stub\DemoJob;
18+
use PHPUnit\Framework\TestCase;
19+
20+
/**
21+
* @internal
22+
* @coversNothing
23+
*/
24+
class MessageTest extends TestCase
25+
{
26+
public function testMessageSerialize()
27+
{
28+
system(__DIR__ . '/async_queue2.2.php');
29+
30+
$message = unserialize(file_get_contents(__DIR__ . '/message2.2.cache'));
31+
32+
$this->assertInstanceOf(MessageInterface::class, $message);
33+
$this->assertInstanceOf(JobInterface::class, $message->job());
34+
$this->assertInstanceOf(JobInterface::class, $message->job());
35+
$this->assertInstanceOf(DemoJob::class, $message->job());
36+
$this->assertSame(9501, $message->job()->id);
37+
38+
$serialized = serialize($message);
39+
$message = unserialize($serialized);
40+
$this->assertInstanceOf(MessageInterface::class, $message);
41+
$this->assertInstanceOf(JobInterface::class, $message->job());
42+
$this->assertInstanceOf(JobInterface::class, $message->job());
43+
$this->assertInstanceOf(DemoJob::class, $message->job());
44+
$this->assertSame(9501, $message->job()->id);
45+
}
46+
47+
public function testJobMessageSerialize()
48+
{
49+
$id = rand(0, 9999);
50+
$message = new JobMessage(
51+
new DemoJob($id)
52+
);
53+
54+
$this->assertInstanceOf(MessageInterface::class, $message);
55+
$this->assertInstanceOf(JobInterface::class, $message->job());
56+
$this->assertInstanceOf(JobInterface::class, $message->job());
57+
$this->assertInstanceOf(DemoJob::class, $message->job());
58+
$this->assertSame($id, $message->job()->id);
59+
}
60+
}

tests/RedisDriverTest.php

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Hyperf\AsyncQueue\Driver\ChannelConfig;
1515
use Hyperf\AsyncQueue\Driver\RedisDriver;
16+
use Hyperf\AsyncQueue\JobMessage;
1617
use Hyperf\AsyncQueue\Message;
1718
use Hyperf\Di\Container;
1819
use Hyperf\Redis\RedisFactory;
@@ -113,7 +114,7 @@ public function testAsyncQueueJobGenerate()
113114
$driver->push(new DemoJob($id, $model));
114115

115116
$serialized = (string) Context::get('test.async-queue.lpush.value');
116-
$this->assertSame(236, strlen($serialized));
117+
$this->assertSame(231, strlen($serialized));
117118

118119
/** @var Message $class */
119120
$class = $packer->unpack($serialized);
@@ -138,8 +139,8 @@ protected function getContainer()
138139
$container->shouldReceive('make')->with(ChannelConfig::class, Mockery::any())->andReturnUsing(function ($class, $args) {
139140
return new ChannelConfig($args['channel']);
140141
});
141-
$container->shouldReceive('make')->with(Message::class, Mockery::any())->andReturnUsing(function ($class, $args) {
142-
return new Message(...$args);
142+
$container->shouldReceive('make')->with(JobMessage::class, Mockery::any())->andReturnUsing(function ($class, $args) {
143+
return new JobMessage(...$args);
143144
});
144145
$container->shouldReceive('get')->with(RedisFactory::class)->andReturnUsing(function ($_) {
145146
$factory = Mockery::mock(RedisFactory::class);

tests/async_queue2.2.php

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
#!/usr/bin/env php
2+
<?php
3+
declare(strict_types=1);
4+
5+
namespace HyperfTest\AsyncQueue\Stub {
6+
require_once __DIR__ . '/../../contract/src/UnCompressInterface.php';
7+
require_once __DIR__ . '/../../contract/src/CompressInterface.php';
8+
require_once __DIR__ . '/../src/MessageInterface.php';
9+
require_once __DIR__ . '/../src/JobInterface.php';
10+
require_once __DIR__ . '/../src/Job.php';
11+
12+
use Hyperf\AsyncQueue\Job;
13+
14+
class DemoJob extends Job
15+
{
16+
public $id;
17+
18+
public $model;
19+
20+
protected int $maxAttempts = 1;
21+
22+
public function __construct($id, $model = null)
23+
{
24+
$this->id = $id;
25+
$this->model = $model;
26+
}
27+
28+
public function handle()
29+
{
30+
}
31+
}
32+
}
33+
34+
namespace Hyperf\AsyncQueue {
35+
use Hyperf\Contract\CompressInterface;
36+
use Hyperf\Contract\UnCompressInterface;
37+
use Serializable;
38+
39+
class Message implements MessageInterface, Serializable
40+
{
41+
/**
42+
* @var CompressInterface|JobInterface|UnCompressInterface
43+
*/
44+
protected $job;
45+
46+
/**
47+
* @var int
48+
*/
49+
protected $attempts = 0;
50+
51+
public function __construct(JobInterface $job)
52+
{
53+
$this->job = $job;
54+
}
55+
56+
public function job(): JobInterface
57+
{
58+
return $this->job;
59+
}
60+
61+
public function attempts(): bool
62+
{
63+
if ($this->job->getMaxAttempts() > $this->attempts++) {
64+
return true;
65+
}
66+
return false;
67+
}
68+
69+
public function getAttempts(): int
70+
{
71+
return $this->attempts;
72+
}
73+
74+
public function serialize()
75+
{
76+
if ($this->job instanceof CompressInterface) {
77+
$this->job = $this->job->compress();
78+
}
79+
80+
return serialize([$this->job, $this->attempts]);
81+
}
82+
83+
public function unserialize($serialized)
84+
{
85+
[$job, $attempts] = unserialize($serialized);
86+
if ($job instanceof UnCompressInterface) {
87+
$job = $job->uncompress();
88+
}
89+
90+
$this->job = $job;
91+
$this->attempts = $attempts;
92+
}
93+
}
94+
95+
$message = new \Hyperf\AsyncQueue\Message(
96+
new \HyperfTest\AsyncQueue\Stub\DemoJob(9501)
97+
);
98+
99+
file_put_contents(__DIR__ . '/message2.2.cache', serialize($message));
100+
}

0 commit comments

Comments
 (0)