Skip to content

Commit b5c9241

Browse files
committed
Add support for job timeouts
1 parent 0ba0b99 commit b5c9241

File tree

5 files changed

+116
-2
lines changed

5 files changed

+116
-2
lines changed

src/CloudTasksServiceProvider.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
namespace Stackkit\LaravelGoogleCloudTasksQueue;
66

77
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;
8+
use Illuminate\Contracts\Debug\ExceptionHandler;
9+
use Illuminate\Foundation\Application;
810
use Illuminate\Queue\Events\JobExceptionOccurred;
911
use Illuminate\Queue\Events\JobFailed;
1012
use Illuminate\Queue\Events\JobProcessed;
@@ -29,6 +31,15 @@ private function registerClient(): void
2931
return new CloudTasksClient();
3032
});
3133

34+
$this->app->singleton('cloud-tasks.worker', function (Application $app) {
35+
return new Worker(
36+
$app['queue'],
37+
$app['events'],
38+
$app[ExceptionHandler::class],
39+
fn() => $app->isDownForMaintenance(),
40+
);
41+
});
42+
3243
$this->app->bind('cloud-tasks-api', CloudTasksApiConcrete::class);
3344
}
3445

src/TaskHandler.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;
88
use Illuminate\Container\Container;
9-
use Illuminate\Queue\Worker;
109
use Illuminate\Queue\WorkerOptions;
1110

1211
class TaskHandler
@@ -51,7 +50,7 @@ private function run(IncomingTask $task): void
5150

5251
$job->setAttempts($job->attempts() + 1);
5352

54-
tap(app('queue.worker'), fn (Worker $worker) => $worker->process(
53+
tap(app('cloud-tasks.worker'), fn (Worker $worker) => $worker->process(
5554
connectionName: $job->getConnectionName(),
5655
job: $job,
5756
options: $this->getWorkerOptions()

src/Worker.php

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Stackkit\LaravelGoogleCloudTasksQueue;
6+
7+
use Illuminate\Queue\Worker as LaravelWorker;
8+
use Illuminate\Queue\WorkerOptions;
9+
10+
/**
11+
* Custom worker class to handle specific requirements for Google Cloud Tasks.
12+
*
13+
* This class modifies the behavior of the Laravel queue worker to better
14+
* integrate with Google Cloud Tasks, particularly focusing on job timeout
15+
* handling and graceful shutdowns to avoid interrupting the HTTP lifecycle.
16+
*
17+
* Firstly, the 'supportsAsyncSignals', 'listenForSignals', and 'registerTimeoutHandler' methods
18+
* are protected and called within the queue while(true) loop. We want (and need!) to have that
19+
* too in order to support job timeouts. So, to make it work, we create a public method that
20+
* can call the private signal methods.
21+
*
22+
* Secondly, we need to override the 'kill' method because it tends to kill the server process (artisan serve, octane),
23+
* as well as abort the HTTP request from Cloud Tasks. This is not the desired behavior.
24+
* Instead, it should just fire the WorkerStopped event and return a normal status code.
25+
*/
26+
class Worker extends LaravelWorker
27+
{
28+
public function process($connectionName, $job, WorkerOptions $options)
29+
{
30+
if ($this->supportsAsyncSignals()) {
31+
$this->listenForSignals();
32+
33+
$this->registerTimeoutHandler($job, $options);
34+
}
35+
36+
return parent::process($connectionName, $job, $options);
37+
}
38+
39+
public function kill($status = 0, $options = null): void
40+
{
41+
parent::stop($status, $options);
42+
43+
// When running tests, we cannot run exit because it will kill the PHPunit process.
44+
// So, to still test that the application has exited, we will simply rely on the
45+
// WorkerStopped event that is fired when the worker is stopped.
46+
if (app()->runningUnitTests()) {
47+
return;
48+
}
49+
50+
exit($status);
51+
}
52+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Support;
6+
7+
use Illuminate\Queue\Events\WorkerStopping;
8+
use Illuminate\Support\Facades\Event;
9+
10+
class SimpleJobWithTimeout extends SimpleJob
11+
{
12+
public $timeout = 3;
13+
14+
public function handle()
15+
{
16+
Event::listen(WorkerStopping::class, function () {
17+
event(new JobOutput('SimpleJobWithTimeout:worker-stopping'));
18+
});
19+
20+
event(new JobOutput('SimpleJobWithTimeout:1'));
21+
sleep(1);
22+
event(new JobOutput('SimpleJobWithTimeout:2'));
23+
sleep(1);
24+
event(new JobOutput('SimpleJobWithTimeout:3'));
25+
sleep(1);
26+
event(new JobOutput('SimpleJobWithTimeout:4'));
27+
sleep(1);
28+
event(new JobOutput('SimpleJobWithTimeout:5'));
29+
}
30+
}

tests/TaskHandlerTest.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Tests\Support\FailingJobWithUnlimitedTries;
1818
use Tests\Support\JobOutput;
1919
use Tests\Support\SimpleJob;
20+
use Tests\Support\SimpleJobWithTimeout;
2021

2122
class TaskHandlerTest extends TestCase
2223
{
@@ -248,4 +249,25 @@ public function retried_jobs_get_a_new_name()
248249
$this->assertCount(2, $this->createdTasks);
249250
$this->assertNotEquals($this->createdTasks[0]->getName(), $this->createdTasks[1]->getName());
250251
}
252+
253+
#[Test]
254+
public function test_job_timeout()
255+
{
256+
// Arrange
257+
Event::fake(JobOutput::class);
258+
259+
// Act
260+
$this->dispatch(new SimpleJobWithTimeout())->run();
261+
262+
// Assert
263+
$events = Event::dispatched(JobOutput::class)->map(fn($event) => $event[0]->output)->toArray();
264+
$this->assertEquals([
265+
'SimpleJobWithTimeout:1',
266+
'SimpleJobWithTimeout:2',
267+
'SimpleJobWithTimeout:3',
268+
'SimpleJobWithTimeout:worker-stopping',
269+
'SimpleJobWithTimeout:4',
270+
'SimpleJobWithTimeout:5',
271+
], $events);
272+
}
251273
}

0 commit comments

Comments
 (0)