Skip to content

Commit 7c6b230

Browse files
committed
Fixed bug that async-queue broken caused by uncompress model failed.
1 parent 5ce9c99 commit 7c6b230

File tree

4 files changed

+45
-20
lines changed

4 files changed

+45
-20
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"php": ">=7.2",
1919
"psr/container": "^1.0|^2.0",
2020
"psr/event-dispatcher": "^1.0",
21-
"hyperf/contract": "~2.2.0",
21+
"hyperf/contract": "~2.2.8",
2222
"hyperf/command": "~2.2.0",
2323
"hyperf/utils": "~2.2.0"
2424
},

src/Driver/Driver.php

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Hyperf\AsyncQueue\Exception\InvalidPackerException;
2020
use Hyperf\AsyncQueue\MessageInterface;
2121
use Hyperf\Contract\PackerInterface;
22+
use Hyperf\Contract\StdoutLoggerInterface;
2223
use Hyperf\Process\ProcessManager;
2324
use Hyperf\Utils\Arr;
2425
use Hyperf\Utils\Coroutine\Concurrent;
@@ -81,29 +82,34 @@ public function consume(): void
8182
$maxMessages = Arr::get($this->config, 'max_messages', 0);
8283

8384
while (ProcessManager::isRunning()) {
84-
[$data, $message] = $this->pop();
85+
try {
86+
[$data, $message] = $this->pop();
8587

86-
if ($data === false) {
87-
continue;
88-
}
88+
if ($data === false) {
89+
continue;
90+
}
8991

90-
$callback = $this->getCallback($data, $message);
92+
$callback = $this->getCallback($data, $message);
9193

92-
if ($this->concurrent instanceof Concurrent) {
93-
$this->concurrent->create($callback);
94-
} else {
95-
parallel([$callback]);
96-
}
94+
if ($this->concurrent instanceof Concurrent) {
95+
$this->concurrent->create($callback);
96+
} else {
97+
parallel([$callback]);
98+
}
9799

98-
if ($messageCount % $this->lengthCheckCount === 0) {
99-
$this->checkQueueLength();
100-
}
100+
if ($messageCount % $this->lengthCheckCount === 0) {
101+
$this->checkQueueLength();
102+
}
101103

102-
if ($maxMessages > 0 && $messageCount >= $maxMessages) {
103-
break;
104+
if ($maxMessages > 0 && $messageCount >= $maxMessages) {
105+
break;
106+
}
107+
} catch (\Throwable $exception) {
108+
$logger = $this->container->get(StdoutLoggerInterface::class);
109+
$logger->error((string) $exception);
110+
} finally {
111+
++$messageCount;
104112
}
105-
106-
++$messageCount;
107113
}
108114
}
109115

tests/RedisDriverTest.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,22 @@ public function testDemoModelGenerate()
8383
$this->assertEquals($model, $model2);
8484
}
8585

86+
public function testDemoModelUnCompressToNull()
87+
{
88+
$content = Str::random(1000);
89+
90+
$model = new DemoModel(9999, 'Hyperf', 1, $content);
91+
$s1 = serialize($model);
92+
$this->assertSame(1131, strlen($s1));
93+
94+
$meta = $model->compress();
95+
$s2 = serialize($meta);
96+
$this->assertSame(68, strlen($s2));
97+
$this->assertInstanceOf(DemoModelMeta::class, $meta);
98+
99+
$this->assertNull($meta->uncompress());
100+
}
101+
86102
public function testAsyncQueueJobGenerate()
87103
{
88104
$container = $this->getContainer();

tests/Stub/DemoModelMeta.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
*/
1212
namespace HyperfTest\AsyncQueue\Stub;
1313

14-
use Hyperf\Contract\CompressInterface;
1514
use Hyperf\Contract\UnCompressInterface;
1615
use Hyperf\Utils\Context;
1716

@@ -24,10 +23,14 @@ public function __construct($id)
2423
$this->id = $id;
2524
}
2625

27-
public function uncompress(): CompressInterface
26+
public function uncompress()
2827
{
2928
$data = Context::get('test.async-queue.demo.model.' . $this->id);
3029

30+
if ($this->id === 9999) {
31+
return null;
32+
}
33+
3134
return new DemoModel($this->id, ...$data);
3235
}
3336
}

0 commit comments

Comments
 (0)