|
4 | 4 |
|
5 | 5 | namespace Stackkit\LaravelGoogleCloudTasksQueue; |
6 | 6 |
|
7 | | -use Google\ApiCore\ApiException; |
8 | 7 | use Google\Cloud\Tasks\V2\Client\CloudTasksClient; |
9 | 8 | use Illuminate\Container\Container; |
10 | 9 | use Illuminate\Queue\Worker; |
11 | 10 | use Illuminate\Queue\WorkerOptions; |
12 | | -use Throwable; |
13 | | - |
14 | | -use function Safe\json_decode; |
15 | 11 |
|
16 | 12 | class TaskHandler |
17 | 13 | { |
18 | | - /** |
19 | | - * @var array |
20 | | - */ |
21 | | - private $config; |
22 | | - |
23 | | - /** |
24 | | - * @var CloudTasksClient |
25 | | - */ |
26 | | - private $client; |
27 | | - |
28 | | - /** |
29 | | - * @var CloudTasksQueue |
30 | | - */ |
31 | | - private $queue; |
32 | | - |
33 | | - public function __construct(CloudTasksClient $client) |
| 14 | + private array $config; |
| 15 | + |
| 16 | + public function __construct(private readonly CloudTasksClient $client) |
34 | 17 | { |
35 | | - $this->client = $client; |
| 18 | + // |
36 | 19 | } |
37 | 20 |
|
38 | 21 | public function handle(?string $task = null): void |
39 | 22 | { |
40 | | - $task = json_decode((string) $task ?: request()->getContent(), assoc: true); |
| 23 | + $task = IncomingTask::fromJson($task ?: request()->getContent()); |
41 | 24 |
|
42 | | - $this->config = config('queue.connections.'.$task['internal']['connection']); |
43 | | - |
44 | | - $this->guard($task); |
| 25 | + if ($task->isEmpty()) { |
| 26 | + abort(422, 'Invalid task payload'); |
| 27 | + } |
45 | 28 |
|
46 | | - $this->handleTask($task); |
47 | | - } |
| 29 | + if (! CloudTasksApi::exists($task->taskName())) { |
| 30 | + abort(404); |
| 31 | + } |
48 | 32 |
|
49 | | - private function guard(array $task): void |
50 | | - { |
51 | | - $appEngine = ! empty($this->config['app_engine']); |
| 33 | + $config = config('queue.connections.'.$task->connection()); |
52 | 34 |
|
53 | | - if ($appEngine) { |
54 | | - // https://cloud.google.com/tasks/docs/creating-appengine-handlers#reading_task_request_headers |
55 | | - // "If your request handler finds any of the headers listed above, it can trust |
56 | | - // that the request is a Cloud Tasks request." |
57 | | - abort_if(empty(request()->header('X-AppEngine-TaskName')), 404); |
| 35 | + $this->config = is_array($config) ? $config : []; |
58 | 36 |
|
59 | | - return; |
60 | | - } |
61 | | - |
62 | | - if (config('cloud-tasks.disable_security_key_verification') !== true) { |
63 | | - abort_if(decrypt($task['internal']['securityKey']) !== $task['uuid'], 404); |
64 | | - } |
| 37 | + $this->run($task); |
65 | 38 | } |
66 | 39 |
|
67 | | - private function handleTask(array $task): void |
| 40 | + private function run(IncomingTask $task): void |
68 | 41 | { |
69 | | - $queue = new CloudTasksQueue( |
70 | | - config: $this->config, |
71 | | - client: $this->client, |
72 | | - ); |
73 | | - |
74 | | - $queue->setConnectionName($task['internal']['connection']); |
| 42 | + $queue = tap(new CloudTasksQueue($this->config, $this->client))->setConnectionName($task->connection()); |
75 | 43 |
|
76 | 44 | $job = new CloudTasksJob( |
77 | 45 | container: Container::getInstance(), |
78 | | - cloudTasksQueue: $queue, |
79 | | - job: $task, |
80 | | - connectionName: $task['internal']['connection'], |
81 | | - queue: $task['internal']['queue'], |
| 46 | + driver: $queue, |
| 47 | + job: $task->toArray(), |
| 48 | + connectionName: $task->connection(), |
| 49 | + queue: $task->queue(), |
82 | 50 | ); |
83 | 51 |
|
84 | | - try { |
85 | | - CloudTasksApi::getTask($task['internal']['taskName']); |
86 | | - } catch (Throwable $e) { |
87 | | - if ($e instanceof ApiException && in_array($e->getStatus(), ['NOT_FOUND', 'PRECONDITION_FAILED'])) { |
88 | | - abort(404); |
89 | | - } |
90 | | - |
91 | | - throw $e; |
92 | | - } |
93 | | - |
94 | 52 | $job->setAttempts($job->attempts() + 1); |
95 | 53 |
|
96 | 54 | tap(app('queue.worker'), fn (Worker $worker) => $worker->process( |
|
0 commit comments