@@ -21,7 +21,7 @@ public function boot(): void
2121 __DIR__ . '/../config/sqs-queue-reader.php ' => config_path ('sqs-queue-reader.php ' ),
2222 ], 'config ' );
2323
24- Queue::after (static function (JobProcessed $ event ) {
24+ Queue::after (function (JobProcessed $ event ) {
2525 if ($ event ->connectionName === 'sqs-json ' ) {
2626 $ queue = $ event ->job ->getQueue ();
2727
@@ -35,47 +35,7 @@ public function boot(): void
3535 if ($ count === 1 ) {
3636 $ event ->job ->delete ();
3737 } else {
38- $ data = $ event ->job ->payload ();
39-
40- $ batchIds = array_column ($ data ['data ' ], 'batchIds ' );
41- $ batchIds = array_chunk ($ batchIds , 10 );
42-
43- $ client = new SqsClient ([
44- //'profile' => 'default',
45- 'region ' => Config::get ('queue.connections.sqs-json.region ' ),
46- 'version ' => '2012-11-05 ' ,
47- 'credentials ' => Arr::only (Config::get ('queue.connections.sqs-json ' ), ['key ' , 'secret ' ]),
48- ]);
49-
50- foreach ($ batchIds as $ batch ) {
51- //Deletes up to ten messages from the specified queue.
52- /*
53- $result = $event->job->deleteMessageBatch([
54- 'Entries' => $batch,
55- 'QueueUrl' => $queue,
56- ]);
57- */
58-
59- try {
60- $ result = $ client ->deleteMessageBatch ([
61- 'Entries ' => $ batch ,
62- 'QueueUrl ' => $ queue ,
63- ]);
64-
65- if (isset ($ result ['Failed ' ])) {
66- $ msg = '' ;
67- foreach ($ result ['Failed ' ] as $ failed ) {
68- $ msg .= sprintf ("Deleting message failed, code = %s, id = %s, msg = %s, senderfault = %s " , $ failed ['Code ' ], $ failed ['Id ' ], $ failed ['Message ' ], $ failed ['SenderFault ' ]);
69- }
70- Log::error ('Cannot delete some SQS messages: ' , [$ msg ]);
71-
72- throw new \RuntimeException ("Cannot delete some messages, consult log for more info! " );
73- }
74- //Log::info('Message remove report:', [$result]);
75- } catch (AwsException $ e ) {
76- Log::error ('AWS SQS client error: ' , [$ e ->getMessage ()]);
77- }
78- }
38+ $ this ->removeMessages ($ event ->job ->payload (), $ queue );
7939 }
8040 }
8141 });
@@ -92,4 +52,40 @@ public function register(): void
9252 });
9353 });
9454 }
55+
56+ private function removeMessages (array $ data , $ queue ): void
57+ {
58+ $ batchIds = array_column ($ data ['data ' ], 'batchIds ' );
59+ $ batchIds = array_chunk ($ batchIds , 10 );
60+
61+ $ client = new SqsClient ([
62+ //'profile' => 'default',
63+ 'region ' => Config::get ('queue.connections.sqs-json.region ' ),
64+ 'version ' => '2012-11-05 ' ,
65+ 'credentials ' => Arr::only (Config::get ('queue.connections.sqs-json ' ), ['key ' , 'secret ' ]),
66+ ]);
67+
68+ foreach ($ batchIds as $ batch ) {
69+ //Deletes up to ten messages from the specified queue.
70+ try {
71+ $ result = $ client ->deleteMessageBatch ([
72+ 'Entries ' => $ batch ,
73+ 'QueueUrl ' => $ queue ,
74+ ]);
75+
76+ if (isset ($ result ['Failed ' ])) {
77+ $ msg = '' ;
78+ foreach ($ result ['Failed ' ] as $ failed ) {
79+ $ msg .= sprintf ("Deleting message failed, code = %s, id = %s, msg = %s, senderfault = %s " , $ failed ['Code ' ], $ failed ['Id ' ], $ failed ['Message ' ], $ failed ['SenderFault ' ]);
80+ }
81+ Log::error ('Cannot delete some SQS messages: ' , [$ msg ]);
82+
83+ throw new \RuntimeException ("Cannot delete some messages, consult log for more info! " );
84+ }
85+ //Log::info('Message remove report:', [$result]);
86+ } catch (AwsException $ e ) {
87+ Log::error ('AWS SQS client error: ' , [$ e ->getMessage ()]);
88+ }
89+ }
90+ }
9591}
0 commit comments