Skip to content

Commit 6762df3

Browse files
committed
Enhance SqsServiceProvider
1 parent d17f601 commit 6762df3

File tree

1 file changed

+195
-63
lines changed

1 file changed

+195
-63
lines changed

src/SqsQueueReaderServiceProvider.php

Lines changed: 195 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -6,100 +6,232 @@
66

77
use Aws\Exception\AwsException;
88
use Aws\Sqs\SqsClient;
9+
use Illuminate\Contracts\Config\Repository as ConfigRepository;
10+
use Illuminate\Contracts\Events\Dispatcher;
11+
use Illuminate\Contracts\Queue\Job;
912
use Illuminate\Queue\Events\JobProcessed;
13+
use Illuminate\Queue\QueueManager;
1014
use Illuminate\Support\Arr;
11-
use Illuminate\Support\Facades\Config;
1215
use Illuminate\Support\Facades\Log;
13-
use Illuminate\Support\Facades\Queue;
1416
use Illuminate\Support\ServiceProvider;
17+
use InvalidArgumentException;
1518
use palPalani\SqsQueueReader\Sqs\Connector;
19+
use RuntimeException;
1620

1721
class SqsQueueReaderServiceProvider extends ServiceProvider
1822
{
23+
private const CONFIG_KEY = 'sqs-queue-reader';
24+
25+
private const CONFIG_PATH = __DIR__ . '/../config/sqs-queue-reader.php';
26+
27+
private const SQS_VERSION = '2012-11-05';
28+
29+
private const BATCH_SIZE = 10;
30+
31+
private const DEFAULT_TIMEOUT = 30;
32+
1933
public function boot(): void
2034
{
2135
if ($this->app->runningInConsole()) {
22-
$this->publishes([
23-
__DIR__ . '/../config/sqs-queue-reader.php' => config_path('sqs-queue-reader.php'),
24-
], 'config');
25-
26-
Queue::after(function (JobProcessed $event) {
27-
$connections = Config::get('queue.connections');
28-
if (\in_array($event->connectionName, array_keys($connections), true)) {
29-
$queue = $event->job->getQueue();
30-
31-
$queueId = explode('/', $queue);
32-
$queueId = array_pop($queueId);
33-
34-
$count = (\array_key_exists($queueId, Config::get('sqs-queue-reader.handlers')))
35-
? Config::get('sqs-queue-reader.handlers')[$queueId]['count']
36-
: Config::get('sqs-queue-reader.default-handler')['count'];
37-
38-
if ($count === 1) {
39-
$event->job->delete();
40-
} else {
41-
$this->removeMessages($event->job->payload(), $queue, $event->connectionName);
42-
}
43-
}
44-
});
36+
$this->publishConfiguration();
4537
}
38+
39+
$this->registerJobEventListener();
4640
}
4741

4842
public function register(): void
4943
{
50-
$this->mergeConfigFrom(__DIR__ . '/../config/sqs-queue-reader.php', 'sqs-queue-reader');
44+
$this->mergeConfigFrom(self::CONFIG_PATH, self::CONFIG_KEY);
45+
46+
$this->app->booted(fn() => $this->registerQueueDriver());
47+
}
48+
49+
private function publishConfiguration(): void
50+
{
51+
$this->publishes([
52+
self::CONFIG_PATH => config_path(self::CONFIG_KEY . '.php'),
53+
], 'config');
54+
}
5155

52-
$this->app->booted(function () {
53-
$this->app['queue']->extend('sqs-json', static function () {
54-
return new Connector;
55-
});
56-
});
56+
private function registerQueueDriver(): void
57+
{
58+
/** @var QueueManager $queueManager */
59+
$queueManager = $this->app['queue'];
60+
61+
$queueManager->extend('sqs-json', static fn() => new Connector);
5762
}
5863

59-
private function removeMessages(array $data, $queue, string $connection): void
64+
private function registerJobEventListener(): void
6065
{
61-
$batchIds = array_column($data['data'], 'batchIds');
62-
$batchIds = array_chunk($batchIds, 10);
66+
/** @var Dispatcher $eventDispatcher */
67+
$eventDispatcher = $this->app['events'];
68+
69+
$eventDispatcher->listen(JobProcessed::class, $this->handleJobProcessed(...));
70+
}
71+
72+
private function handleJobProcessed(JobProcessed $event): void
73+
{
74+
if (! $this->shouldProcessJob($event)) {
75+
return;
76+
}
77+
78+
$messageCount = $this->getMessageCount($event->job);
79+
80+
if ($messageCount === 1) {
81+
$event->job->delete();
82+
83+
return;
84+
}
85+
86+
$this->removeBatchMessages($event->job, $event->connectionName);
87+
}
88+
89+
private function shouldProcessJob(JobProcessed $event): bool
90+
{
91+
/** @var ConfigRepository $config */
92+
$config = $this->app['config'];
93+
94+
$connections = $config->get('queue.connections', []);
95+
96+
return array_key_exists($event->connectionName, $connections);
97+
}
98+
99+
private function getMessageCount(Job $job): int
100+
{
101+
$queueId = $this->extractQueueId($job->getQueue());
102+
103+
/** @var ConfigRepository $config */
104+
$config = $this->app['config'];
105+
106+
$handlers = $config->get(self::CONFIG_KEY . '.handlers', []);
107+
108+
if (array_key_exists($queueId, $handlers)) {
109+
return (int) $handlers[$queueId]['count'];
110+
}
63111

64-
$config = Config::get('queue.connections.' . $connection);
112+
$defaultHandler = $config->get(self::CONFIG_KEY . '.default-handler', []);
65113

66-
$sqsClientConfig = [
67-
// 'profile' => 'default',
68-
'region' => Config::get('queue.connections.' . $connection . '.region'),
69-
'version' => '2012-11-05',
114+
return (int) ($defaultHandler['count'] ?? 1);
115+
}
116+
117+
private function extractQueueId(string $queue): string
118+
{
119+
$segments = explode('/', $queue);
120+
121+
return array_pop($segments) ?: throw new InvalidArgumentException("Invalid queue format: {$queue}");
122+
}
123+
124+
private function removeBatchMessages(Job $job, string $connectionName): void
125+
{
126+
try {
127+
$payload = $job->payload();
128+
$batchIds = $this->extractBatchIds($payload);
129+
130+
if (empty($batchIds)) {
131+
return;
132+
}
133+
134+
$sqsClient = $this->createSqsClient($connectionName);
135+
$this->deleteBatchMessages($sqsClient, $batchIds, $job->getQueue());
136+
137+
} catch (AwsException $exception) {
138+
Log::error('AWS SQS client error during message removal', [
139+
'error' => $exception->getMessage(),
140+
'connection' => $connectionName,
141+
'queue' => $job->getQueue(),
142+
]);
143+
} catch (RuntimeException $exception) {
144+
Log::error('Failed to remove SQS messages', [
145+
'error' => $exception->getMessage(),
146+
'connection' => $connectionName,
147+
'queue' => $job->getQueue(),
148+
]);
149+
}
150+
}
151+
152+
/**
153+
* @param array<string, mixed> $payload
154+
* @return array<int, array<string, mixed>>
155+
*/
156+
private function extractBatchIds(array $payload): array
157+
{
158+
if (! isset($payload['data']['data'])) {
159+
return [];
160+
}
161+
162+
$batchIds = array_column($payload['data']['data'], 'batchIds');
163+
164+
return array_chunk($batchIds, self::BATCH_SIZE);
165+
}
166+
167+
private function createSqsClient(string $connectionName): SqsClient
168+
{
169+
/** @var ConfigRepository $config */
170+
$config = $this->app['config'];
171+
172+
$connectionConfig = $config->get("queue.connections.{$connectionName}");
173+
174+
if (! is_array($connectionConfig)) {
175+
throw new InvalidArgumentException("Invalid connection configuration for: {$connectionName}");
176+
}
177+
178+
$sqsConfig = [
179+
'region' => $connectionConfig['region'] ?? throw new InvalidArgumentException("Missing region for connection: {$connectionName}"),
180+
'version' => self::SQS_VERSION,
70181
'http' => [
71-
'timeout' => 30,
72-
'connect_timeout' => 30,
182+
'timeout' => self::DEFAULT_TIMEOUT,
183+
'connect_timeout' => self::DEFAULT_TIMEOUT,
73184
],
74185
];
75186

76-
if (isset($config['key'], $config['secret'])) {
77-
$sqsClientConfig['credentials'] = Arr::only($config, ['key', 'secret']);
187+
if (isset($connectionConfig['key'], $connectionConfig['secret'])) {
188+
$sqsConfig['credentials'] = Arr::only($connectionConfig, ['key', 'secret']);
78189
}
79190

80-
$client = new SqsClient($sqsClientConfig);
191+
return new SqsClient($sqsConfig);
192+
}
81193

194+
/**
195+
* @param array<int, array<string, mixed>> $batchIds
196+
*/
197+
private function deleteBatchMessages(SqsClient $client, array $batchIds, string $queueUrl): void
198+
{
82199
foreach ($batchIds as $batch) {
83-
// Deletes up to ten messages from the specified queue.
84-
try {
85-
$result = $client->deleteMessageBatch([
86-
'Entries' => $batch,
87-
'QueueUrl' => $queue,
88-
]);
89-
90-
if (isset($result['Failed'])) {
91-
$msg = '';
92-
foreach ($result['Failed'] as $failed) {
93-
$msg .= sprintf('Deleting message failed, code = %s, id = %s, msg = %s, senderfault = %s', $failed['Code'], $failed['Id'], $failed['Message'], $failed['SenderFault']);
94-
}
95-
Log::error('Cannot delete some SQS messages: ', [$msg]);
96-
97-
throw new \RuntimeException('Cannot delete some messages, consult log for more info!');
98-
}
99-
// Log::info('Message remove report:', [$result]);
100-
} catch (AwsException $e) {
101-
Log::error('AWS SQS client error:', [$e->getMessage()]);
200+
$result = $client->deleteMessageBatch([
201+
'Entries' => $batch,
202+
'QueueUrl' => $queueUrl,
203+
]);
204+
205+
if (isset($result['Failed']) && ! empty($result['Failed'])) {
206+
$this->handleFailedDeletions($result['Failed']);
102207
}
103208
}
104209
}
210+
211+
/**
212+
* @param array<int, array<string, mixed>> $failedDeletions
213+
*/
214+
private function handleFailedDeletions(array $failedDeletions): void
215+
{
216+
$errorMessages = [];
217+
218+
foreach ($failedDeletions as $failed) {
219+
$errorMessages[] = sprintf(
220+
'Code: %s, ID: %s, Message: %s, Sender Fault: %s',
221+
$failed['Code'] ?? 'unknown',
222+
$failed['Id'] ?? 'unknown',
223+
$failed['Message'] ?? 'unknown',
224+
$failed['SenderFault'] ?? 'unknown'
225+
);
226+
}
227+
228+
$combinedMessage = implode(' | ', $errorMessages);
229+
230+
Log::error('Failed to delete SQS messages', [
231+
'failed_deletions' => $failedDeletions,
232+
'error_summary' => $combinedMessage,
233+
]);
234+
235+
throw new RuntimeException('Failed to delete some SQS messages. Check logs for details.');
236+
}
105237
}

0 commit comments

Comments
 (0)