@@ -12,6 +12,7 @@ class TaskHandler
1212{
1313 private $ request ;
1414 private $ publicKey ;
15+ private $ config ;
1516
1617 public function __construct (CloudTasksClient $ client , Request $ request , OpenIdVerificator $ publicKey )
1718 {
@@ -28,20 +29,29 @@ public function handle($task = null)
2829 {
2930 $ task = $ task ?: $ this ->captureTask ();
3031
31- $ command = unserialize ($ task ['data ' ]['command ' ]);
32- $ connection = $ command ->connection ?? config ('queue.default ' );
32+ $ this ->loadQueueConnectionConfiguration ($ task );
3333
34- $ this ->authorizeRequest ($ connection );
34+ $ this ->authorizeRequest ();
3535
36- $ this ->listenForEvents ($ connection );
36+ $ this ->listenForEvents ();
37+
38+ $ this ->handleTask ($ task );
39+ }
3740
38- $ this ->handleTask ($ connection , $ task );
41+ private function loadQueueConnectionConfiguration ($ task )
42+ {
43+ $ command = unserialize ($ task ['data ' ]['command ' ]);
44+ $ connection = $ command ->connection ?? config ('queue.default ' );
45+ $ this ->config = array_merge (
46+ config ("queue.connections. {$ connection }" ),
47+ ['connection ' => $ connection ]
48+ );
3949 }
4050
4151 /**
4252 * @throws CloudTasksException
4353 */
44- public function authorizeRequest ($ connection )
54+ public function authorizeRequest ()
4555 {
4656 if (!$ this ->request ->hasHeader ('Authorization ' )) {
4757 throw new CloudTasksException ('Missing [Authorization] header ' );
@@ -52,7 +62,7 @@ public function authorizeRequest($connection)
5262
5363 $ decodedToken = $ this ->publicKey ->decodeOpenIdToken ($ openIdToken , $ kid );
5464
55- $ this ->validateToken ($ connection , $ decodedToken );
65+ $ this ->validateToken ($ decodedToken );
5666 }
5767
5868 /**
@@ -61,13 +71,13 @@ public function authorizeRequest($connection)
6171 * @param $openIdToken
6272 * @throws CloudTasksException
6373 */
64- protected function validateToken ($ connection , $ openIdToken )
74+ protected function validateToken ($ openIdToken )
6575 {
6676 if (!in_array ($ openIdToken ->iss , ['https://accounts.google.com ' , 'accounts.google.com ' ])) {
6777 throw new CloudTasksException ('The given OpenID token is not valid ' );
6878 }
6979
70- if ($ openIdToken ->aud != Config:: handler ( $ connection ) ) {
80+ if ($ openIdToken ->aud != $ this -> config [ ' handler ' ] ) {
7181 throw new CloudTasksException ('The given OpenID token is not valid ' );
7282 }
7383
@@ -96,11 +106,11 @@ private function captureTask()
96106 return $ task ;
97107 }
98108
99- private function listenForEvents ($ connection )
109+ private function listenForEvents ()
100110 {
101- app ('events ' )->listen (JobFailed::class, function ($ event ) use ( $ connection ) {
111+ app ('events ' )->listen (JobFailed::class, function ($ event ) {
102112 app ('queue.failer ' )->log (
103- $ connection , $ event ->job ->getQueue (),
113+ $ this -> config [ ' connection ' ] , $ event ->job ->getQueue (),
104114 $ event ->job ->getRawBody (), $ event ->exception
105115 );
106116 });
@@ -110,24 +120,24 @@ private function listenForEvents($connection)
110120 * @param $task
111121 * @throws CloudTasksException
112122 */
113- private function handleTask ($ connection , $ task )
123+ private function handleTask ($ task )
114124 {
115125 $ job = new CloudTasksJob ($ task );
116126
117127 $ job ->setAttempts (request ()->header ('X-CloudTasks-TaskRetryCount ' ) + 1 );
118128 $ job ->setQueue (request ()->header ('X-Cloudtasks-Queuename ' ));
119- $ job ->setMaxTries ($ this ->getQueueMaxTries ($ connection , $ job ));
129+ $ job ->setMaxTries ($ this ->getQueueMaxTries ($ job ));
120130
121131 $ worker = $ this ->getQueueWorker ();
122132
123- $ worker ->process ($ connection , $ job , new WorkerOptions ());
133+ $ worker ->process ($ this -> config [ ' connection ' ] , $ job , new WorkerOptions ());
124134 }
125135
126- private function getQueueMaxTries ($ connection , CloudTasksJob $ job )
136+ private function getQueueMaxTries (CloudTasksJob $ job )
127137 {
128138 $ queueName = $ this ->client ->queueName (
129- Config:: project ( $ connection ) ,
130- Config:: location ( $ connection ) ,
139+ $ this -> config [ ' project ' ] ,
140+ $ this -> config [ ' location ' ] ,
131141 $ job ->getQueue ()
132142 );
133143
0 commit comments