1111use Aws \Sqs \SqsClient ;
1212use yii \base \NotSupportedException ;
1313use yii \queue \cli \Queue as CliQueue ;
14+ use yii \queue \serializers \JsonSerializer ;
1415
1516/**
1617 * SQS Queue.
@@ -52,8 +53,14 @@ class Queue extends CliQueue
5253
5354 /**
5455 * @var string command class name
56+ * @inheritdoc
5557 */
5658 public $ commandClass = Command::class;
59+ /**
60+ * Json serializer by default.
61+ * @inheritdoc
62+ */
63+ public $ serializer = JsonSerializer::class;
5764
5865 /**
5966 * @var SqsClient
@@ -80,12 +87,12 @@ public function run($repeat, $timeout = 0)
8087 {
8188 return $ this ->runWorker (function (callable $ canContinue ) use ($ repeat , $ timeout ) {
8289 while ($ canContinue ()) {
83- if (($ payload = $ this ->getPayload ($ timeout )) !== null ) {
84- list ( $ ttr , $ message ) = explode ( ' ; ' , base64_decode ( $ payload ['Body ' ]), 2 ) ;
85- //reserve it so it is not visible to another worker till ttr
86- $ this -> reserve ( $ payload , $ ttr ) ;
87-
88- if ($ this ->handleMessage (null , $ message , $ ttr , 1 )) {
90+ if (($ payload = $ this ->reserve ($ timeout )) !== null ) {
91+ $ id = $ payload ['MessageId ' ] ;
92+ $ message = $ payload [ ' Body ' ];
93+ $ ttr = ( int ) $ payload [ ' MessageAttributes ' ][ ' TTR ' ][ ' StringValue ' ] ;
94+ $ attempt = ( int ) $ payload [ ' Attributes ' ][ ' ApproximateReceiveCount ' ];
95+ if ($ this ->handleMessage ($ id , $ message , $ ttr , $ attempt )) {
8996 $ this ->release ($ payload );
9097 }
9198 } elseif (!$ repeat ) {
@@ -96,64 +103,50 @@ public function run($repeat, $timeout = 0)
96103 }
97104
98105 /**
99- * Gets a single message from SQS queue.
106+ * Gets a single message from SQS queue and sets the visibility to reserve message .
100107 *
101108 * @param int $timeout number of seconds for long polling. Must be between 0 and 20.
102109 * @return null|array payload.
103110 */
104- private function getPayload ($ timeout = 0 )
111+ public function reserve ($ timeout )
105112 {
106- $ payload = $ this ->getClient ()->receiveMessage ([
113+ $ response = $ this ->getClient ()->receiveMessage ([
107114 'QueueUrl ' => $ this ->url ,
108115 'AttributeNames ' => ['ApproximateReceiveCount ' ],
116+ 'MessageAttributeNames ' => ['TTR ' ],
109117 'MaxNumberOfMessages ' => 1 ,
118+ 'VisibilityTimeout ' => $ this ->ttr ,
110119 'WaitTimeSeconds ' => $ timeout ,
111120 ]);
112-
113- $ payload = $ payload ['Messages ' ];
114- if ($ payload ) {
115- return array_pop ($ payload );
121+ if (!$ response ['Messages ' ]) {
122+ return null ;
116123 }
117124
118- return null ;
119- }
125+ $ payload = reset ($ response ['Messages ' ]);
120126
121- /**
122- * Set the visibility to reserve message.
123- * So that no other worker can see this message.
124- *
125- * @param array $payload
126- * @param int $ttr
127- */
128- private function reserve ($ payload , $ ttr )
129- {
130- $ receiptHandle = $ payload ['ReceiptHandle ' ];
131- $ this ->getClient ()->changeMessageVisibility ([
132- 'QueueUrl ' => $ this ->url ,
133- 'ReceiptHandle ' => $ receiptHandle ,
134- 'VisibilityTimeout ' => $ ttr ,
135- ]);
127+ $ ttr = (int ) $ payload ['MessageAttributes ' ]['TTR ' ]['StringValue ' ];
128+ if ($ ttr != $ this ->ttr ) {
129+ $ this ->getClient ()->changeMessageVisibility ([
130+ 'QueueUrl ' => $ this ->url ,
131+ 'ReceiptHandle ' => $ payload ['ReceiptHandle ' ],
132+ 'VisibilityTimeout ' => $ ttr ,
133+ ]);
134+ }
135+
136+ return $ payload ;
136137 }
137138
138139 /**
139140 * Mark the message as handled.
140141 *
141142 * @param array $payload
142- * @return bool
143143 */
144144 private function release ($ payload )
145145 {
146- if (empty ($ payload ['ReceiptHandle ' ])) {
147- return false ;
148- }
149-
150- $ receiptHandle = $ payload ['ReceiptHandle ' ];
151- $ response = $ this ->getClient ()->deleteMessage ([
146+ $ this ->getClient ()->deleteMessage ([
152147 'QueueUrl ' => $ this ->url ,
153- 'ReceiptHandle ' => $ receiptHandle ,
148+ 'ReceiptHandle ' => $ payload [ ' ReceiptHandle ' ] ,
154149 ]);
155-
156- return $ response !== null ;
157150 }
158151
159152 /**
@@ -183,13 +176,18 @@ protected function pushMessage($message, $ttr, $delay, $priority)
183176 throw new NotSupportedException ('Priority is not supported in this driver ' );
184177 }
185178
186- $ model = $ this ->getClient ()->sendMessage ([
187- 'DelaySeconds ' => $ delay ,
179+ $ response = $ this ->getClient ()->sendMessage ([
188180 'QueueUrl ' => $ this ->url ,
189- 'MessageBody ' => base64_encode ("$ ttr; $ message " ),
181+ 'MessageBody ' => $ message ,
182+ 'DelaySeconds ' => $ delay ,
183+ 'MessageAttributes ' => [
184+ 'TTR ' => [
185+ 'DataType ' => 'Number ' ,
186+ 'StringValue ' => $ ttr ,
187+ ],
188+ ],
190189 ]);
191-
192- return $ model ['MessageId ' ];
190+ return $ response ['MessageId ' ];
193191 }
194192
195193 /**
@@ -211,22 +209,12 @@ protected function getClient()
211209 //see - http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/credentials.html#credential-profiles
212210 $ credentials = CredentialProvider::defaultProvider ();
213211 }
212+
214213 $ this ->_client = new SqsClient ([
215214 'credentials ' => $ credentials ,
216215 'region ' => $ this ->region ,
217216 'version ' => $ this ->version ,
218217 ]);
219-
220218 return $ this ->_client ;
221219 }
222-
223- /**
224- * Sets the AWS SQS client instance for the queue.
225- *
226- * @param SqsClient $client AWS SQS client object.
227- */
228- public function setClient (SqsClient $ client )
229- {
230- $ this ->_client = $ client ;
231- }
232220}
0 commit comments