Skip to content

Commit 4579ae5

Browse files
committed
Enhance SqsServiceProvider
1 parent 6762df3 commit 4579ae5

File tree

1 file changed

+216
-97
lines changed

1 file changed

+216
-97
lines changed

src/Sqs/Queue.php

Lines changed: 216 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -5,182 +5,301 @@
55
namespace palPalani\SqsQueueReader\Sqs;
66

77
use Aws\Exception\AwsException;
8+
use Illuminate\Contracts\Queue\Job;
89
use Illuminate\Queue\Jobs\SqsJob;
910
use Illuminate\Queue\SqsQueue;
1011
use Illuminate\Support\Facades\Config;
1112
use Illuminate\Support\Str;
1213
use JsonException;
1314
use palPalani\SqsQueueReader\Jobs\DispatcherJob;
15+
use RuntimeException;
1416

1517
/**
16-
* Class CustomSqsQueue
18+
* Custom SQS Queue implementation for handling raw JSON payloads from external sources.
19+
*
20+
* This queue extends Laravel's SqsQueue to support:
21+
* - Raw JSON message processing
22+
* - Single and batch message handling
23+
* - Custom handler class routing based on queue configuration
1724
*/
1825
class Queue extends SqsQueue
1926
{
2027
/**
2128
* Create a payload string from the given job and data.
2229
*
23-
* @param object|string $job
24-
* @param string $queue
25-
* @param mixed $data
30+
* @param object|string $job The job instance or class name
31+
* @param ?string $queue The queue name
32+
* @param mixed $data Additional job data
2633
*
27-
* @throws JsonException
34+
* @throws JsonException When JSON encoding fails
2835
*/
29-
protected function createPayload($job, $queue = null, $data = '', $delay = null): string
36+
protected function createPayload($job, $queue = null, $data = ''): string
3037
{
3138
if (! $job instanceof DispatcherJob) {
3239
return parent::createPayload($job, $queue, $data);
3340
}
3441

35-
$handlerJob = $this->getClass($queue) . '@handle';
42+
if ($job->isPlain()) {
43+
return json_encode($job->getPayload(), JSON_THROW_ON_ERROR);
44+
}
45+
46+
$handlerClass = $this->getHandlerClass($queue);
3647

37-
return $job->isPlain() ? \json_encode($job->getPayload(), JSON_THROW_ON_ERROR) : \json_encode([
38-
'job' => $handlerJob,
48+
return json_encode([
49+
'job' => "{$handlerClass}@handle",
3950
'data' => $job->getPayload(),
4051
], JSON_THROW_ON_ERROR);
4152
}
4253

43-
private function getClass($queue = null): string
54+
/**
55+
* Get the handler class for the specified queue.
56+
*
57+
* @param ?string $queue The queue URL or name
58+
* @return string The fully qualified handler class name
59+
*/
60+
private function getHandlerClass(?string $queue = null): string
61+
{
62+
$queueId = $this->extractQueueId($queue);
63+
$handlers = Config::get('sqs-queue-reader.handlers', []);
64+
$defaultHandler = Config::get('sqs-queue-reader.default-handler');
65+
66+
if ($queueId && array_key_exists($queueId, $handlers)) {
67+
return $handlers[$queueId]['class'];
68+
}
69+
70+
return $defaultHandler['class'];
71+
}
72+
73+
/**
74+
* Extract queue ID from queue URL or return null for default queue.
75+
*
76+
* @param ?string $queue The queue URL or name
77+
* @return ?string The extracted queue ID
78+
*/
79+
private function extractQueueId(?string $queue): ?string
4480
{
4581
if (! $queue) {
46-
return Config::get('sqs-queue-reader.default-handler')['class'];
82+
return null;
4783
}
4884

49-
$queueId = explode('/', $queue);
50-
$queueId = array_pop($queueId);
85+
$parts = explode('/', $queue);
5186

52-
return (\array_key_exists($queueId, Config::get('sqs-queue-reader.handlers')))
53-
? Config::get('sqs-queue-reader.handlers')[$queueId]['class']
54-
: Config::get('sqs-queue-reader.default-handler')['class'];
87+
return array_pop($parts);
88+
}
89+
90+
/**
91+
* Get queue configuration for the specified queue.
92+
*
93+
* @param ?string $queue The queue URL or name
94+
* @return array{class: string, count: int} Queue configuration
95+
*/
96+
private function getQueueConfig(?string $queue): array
97+
{
98+
$queueId = $this->extractQueueId($queue);
99+
$handlers = Config::get('sqs-queue-reader.handlers', []);
100+
$defaultHandler = Config::get('sqs-queue-reader.default-handler');
101+
102+
if ($queueId && array_key_exists($queueId, $handlers)) {
103+
return $handlers[$queueId];
104+
}
105+
106+
return $defaultHandler;
55107
}
56108

57109
/**
58110
* Pop the next job off of the queue.
59111
*
60-
* @param string $queue
61-
* @return \Illuminate\Contracts\Queue\Job|null
112+
* @param ?string $queue The queue name
113+
* @return ?Job The next job or null if no jobs available
62114
*
63-
* @throws JsonException
115+
* @throws JsonException When JSON processing fails
116+
* @throws RuntimeException When SQS operation fails
64117
*/
65118
public function pop($queue = null)
66119
{
67-
$queue = $this->getQueue($queue);
68-
69-
$queueId = explode('/', $queue);
70-
$queueId = array_pop($queueId);
71-
72-
$count = (\array_key_exists($queueId, Config::get('sqs-queue-reader.handlers')))
73-
? Config::get('sqs-queue-reader.handlers')[$queueId]['count']
74-
: Config::get('sqs-queue-reader.default-handler')['count'];
120+
$queueUrl = $this->getQueue($queue);
121+
$queueConfig = $this->getQueueConfig($queueUrl);
75122

76123
try {
77-
$response = $this->sqs->receiveMessage([
78-
'QueueUrl' => $queue,
79-
'AttributeNames' => ['ApproximateReceiveCount'],
80-
'MaxNumberOfMessages' => $count,
81-
'MessageAttributeNames' => ['All'],
82-
]);
83-
84-
if (isset($response['Messages']) && count($response['Messages']) > 0) {
85-
$class = (\array_key_exists($queueId, $this->container['config']->get('sqs-queue-reader.handlers')))
86-
? $this->container['config']->get('sqs-queue-reader.handlers')[$queueId]['class']
87-
: $this->container['config']->get('sqs-queue-reader.default-handler')['class'];
88-
89-
if ($count === 1) {
90-
$response = $this->modifySinglePayload($response['Messages'][0], $class);
91-
} else {
92-
$response = $this->modifyMultiplePayload($response['Messages'], $class);
93-
}
94-
95-
return new SqsJob($this->container, $this->sqs, $response, $this->connectionName, $queue);
124+
$response = $this->receiveMessages($queueUrl, $queueConfig['count']);
125+
126+
if (empty($response['Messages'])) {
127+
return;
96128
}
97-
} catch (AwsException $e) {
98-
$msg = 'Line: ' . $e->getLine() . ', ' . $e->getFile() . ', ' . $e->getMessage();
99129

100-
throw new \RuntimeException('Aws SQS error: ' . $msg);
130+
$messages = $response['Messages'];
131+
$handlerClass = $queueConfig['class'];
132+
133+
$processedResponse = $this->processMessages($messages, $handlerClass);
134+
135+
return new SqsJob(
136+
$this->container,
137+
$this->sqs,
138+
$processedResponse,
139+
$this->connectionName,
140+
$queueUrl
141+
);
142+
} catch (AwsException $e) {
143+
throw new RuntimeException(
144+
sprintf(
145+
'AWS SQS error: %s (File: %s, Line: %d)',
146+
$e->getMessage(),
147+
$e->getFile(),
148+
$e->getLine()
149+
),
150+
$e->getCode(),
151+
$e
152+
);
101153
}
102154
}
103155

104156
/**
105-
* @throws JsonException
157+
* Receive messages from SQS queue.
158+
*
159+
* @param string $queueUrl The SQS queue URL
160+
* @param int $maxMessages Maximum number of messages to receive
161+
* @return array SQS response containing messages
162+
*
163+
* @throws AwsException When SQS operation fails
106164
*/
107-
private function modifySinglePayload(array|string $payload, string $class): array|string
165+
private function receiveMessages(string $queueUrl, int $maxMessages): array
108166
{
109-
if (! is_array($payload)) {
110-
$payload = \json_decode($payload, true, 512, JSON_THROW_ON_ERROR);
111-
}
167+
$result = $this->sqs->receiveMessage([
168+
'QueueUrl' => $queueUrl,
169+
'AttributeNames' => ['ApproximateReceiveCount'],
170+
'MaxNumberOfMessages' => $maxMessages,
171+
'MessageAttributeNames' => ['All'],
172+
]);
112173

113-
$body = \json_decode($payload['Body'], true, 512, JSON_THROW_ON_ERROR);
174+
return $result->toArray();
175+
}
114176

115-
$payload['Body'] = \json_encode([
177+
/**
178+
* Process received messages into Laravel job format.
179+
*
180+
* @param array $messages Array of SQS messages
181+
* @param string $handlerClass The handler class name
182+
* @return array Processed message data
183+
*
184+
* @throws JsonException When JSON processing fails
185+
*/
186+
private function processMessages(array $messages, string $handlerClass): array
187+
{
188+
return count($messages) === 1
189+
? $this->processSingleMessage($messages[0], $handlerClass)
190+
: $this->processMultipleMessages($messages, $handlerClass);
191+
}
192+
193+
/**
194+
* Process a single SQS message into Laravel job format.
195+
*
196+
* @param array $message The SQS message data
197+
* @param string $handlerClass The handler class name
198+
* @return array Processed message data
199+
*
200+
* @throws JsonException When JSON processing fails
201+
*/
202+
private function processSingleMessage(array $message, string $handlerClass): array
203+
{
204+
$messageBody = $this->decodeMessageBody($message['Body']);
205+
206+
$message['Body'] = json_encode([
116207
'uuid' => (string) Str::uuid(),
117-
'job' => $class . '@handle',
118-
'data' => $body['data'] ?? $body,
208+
'job' => "{$handlerClass}@handle",
209+
'data' => $messageBody['data'] ?? $messageBody,
119210
], JSON_THROW_ON_ERROR);
120211

121-
return $payload;
212+
return $message;
122213
}
123214

124215
/**
125-
* @throws JsonException
216+
* Process multiple SQS messages into Laravel batch job format.
217+
*
218+
* @param array $messages Array of SQS message data
219+
* @param string $handlerClass The handler class name
220+
* @return array Processed batch message data
221+
*
222+
* @throws JsonException When JSON processing fails
126223
*/
127-
private function modifyMultiplePayload(array|string $payload, string $class): array
224+
private function processMultipleMessages(array $messages, string $handlerClass): array
128225
{
129-
if (! is_array($payload)) {
130-
$payload = \json_decode($payload, true, 512, JSON_THROW_ON_ERROR);
131-
}
226+
$batchData = [];
227+
$lastMessage = end($messages);
132228

133-
$body = [];
134-
$attributes = [];
135-
$messageId = null;
136-
$receiptHandle = null;
137-
138-
foreach ($payload as $k => $item) {
139-
try {
140-
$message = \json_decode($item['Body'], true, 512, JSON_THROW_ON_ERROR);
141-
} catch (JsonException $e) {
142-
$message = [];
143-
}
229+
foreach ($messages as $index => $message) {
230+
$messageBody = $this->safeDecodeMessageBody($message['Body']);
144231

145-
$body[$k] = [
146-
'messages' => $message,
147-
'attributes' => $item['Attributes'],
232+
$batchData[$index] = [
233+
'messages' => $messageBody,
234+
'attributes' => $message['Attributes'] ?? [],
148235
'batchIds' => [
149-
'Id' => $item['MessageId'],
150-
'ReceiptHandle' => $item['ReceiptHandle'],
236+
'Id' => $message['MessageId'],
237+
'ReceiptHandle' => $message['ReceiptHandle'],
151238
],
152239
];
153-
$attributes = $item['Attributes'];
154-
$messageId = $item['MessageId'];
155-
$receiptHandle = $item['ReceiptHandle'];
156240
}
157241

158242
return [
159-
'MessageId' => $messageId,
160-
'ReceiptHandle' => $receiptHandle,
161-
'Body' => \json_encode([
243+
'MessageId' => $lastMessage['MessageId'],
244+
'ReceiptHandle' => $lastMessage['ReceiptHandle'],
245+
'Body' => json_encode([
162246
'uuid' => (string) Str::uuid(),
163-
'job' => $class . '@handle',
164-
'data' => $body,
247+
'job' => "{$handlerClass}@handle",
248+
'data' => $batchData,
165249
], JSON_THROW_ON_ERROR),
166-
'Attributes' => $attributes,
250+
'Attributes' => $lastMessage['Attributes'] ?? [],
167251
];
168252
}
169253

170254
/**
171-
* @param string $payload
172-
* @param string|null $queue
255+
* Decode message body JSON with error handling.
173256
*
174-
* @throws JsonException
257+
* @param string $messageBody The raw message body
258+
* @return array The decoded message data
259+
*
260+
* @throws JsonException When JSON decoding fails
175261
*/
176-
public function pushRaw($payload, $queue = null, array $options = []): mixed
262+
private function decodeMessageBody(string $messageBody): array
177263
{
178-
$payload = \json_decode($payload, true, 512, JSON_THROW_ON_ERROR);
264+
return json_decode($messageBody, true, 512, JSON_THROW_ON_ERROR);
265+
}
179266

180-
if (isset($payload['data'], $payload['job'])) {
181-
$payload = $payload['data'];
267+
/**
268+
* Safely decode message body JSON, returning empty array on failure.
269+
*
270+
* @param string $messageBody The raw message body
271+
* @return array The decoded message data or empty array
272+
*/
273+
private function safeDecodeMessageBody(string $messageBody): array
274+
{
275+
try {
276+
return $this->decodeMessageBody($messageBody);
277+
} catch (JsonException) {
278+
return [];
182279
}
280+
}
281+
282+
/**
283+
* Push a raw payload onto the queue.
284+
*
285+
* @param string $payload The raw JSON payload
286+
* @param ?string $queue The queue name
287+
* @param array $options Additional options
288+
* @return mixed The result of the push operation
289+
*
290+
* @throws JsonException When JSON processing fails
291+
*/
292+
public function pushRaw($payload, $queue = null, array $options = [])
293+
{
294+
$decodedPayload = json_decode($payload, true, 512, JSON_THROW_ON_ERROR);
295+
296+
// Extract data from Laravel job format if present
297+
if (isset($decodedPayload['data'], $decodedPayload['job'])) {
298+
$decodedPayload = $decodedPayload['data'];
299+
}
300+
301+
$processedPayload = json_encode($decodedPayload, JSON_THROW_ON_ERROR);
183302

184-
return parent::pushRaw(\json_encode($payload, JSON_THROW_ON_ERROR), $queue, $options);
303+
return parent::pushRaw($processedPayload, $queue, $options);
185304
}
186305
}

0 commit comments

Comments
 (0)