1414use Doctrine \DBAL \Connection as DBALConnection ;
1515use Doctrine \DBAL \DBALException ;
1616use Doctrine \DBAL \Exception \TableNotFoundException ;
17+ use Doctrine \DBAL \Query \QueryBuilder ;
1718use Doctrine \DBAL \Schema \Schema ;
1819use Doctrine \DBAL \Schema \Synchronizer \SingleDatabaseSynchronizer ;
1920use Doctrine \DBAL \Types \Type ;
@@ -128,25 +129,14 @@ public function get(): ?array
128129 {
129130 $ this ->driverConnection ->beginTransaction ();
130131 try {
131- $ query = $ this ->driverConnection ->createQueryBuilder ()
132- ->select ('m.* ' )
133- ->from ($ this ->configuration ['table_name ' ], 'm ' )
134- ->where ('m.delivered_at is null OR m.delivered_at < :redeliver_limit ' )
135- ->andWhere ('m.available_at <= :now ' )
136- ->andWhere ('m.queue_name = :queue_name ' )
132+ $ query = $ this ->createAvailableMessagesQueryBuilder ()
137133 ->orderBy ('available_at ' , 'ASC ' )
138134 ->setMaxResults (1 );
139135
140- $ now = \DateTime::createFromFormat ('U.u ' , microtime (true ));
141- $ redeliverLimit = (clone $ now )->modify (sprintf ('-%d seconds ' , $ this ->configuration ['redeliver_timeout ' ]));
142136 // use SELECT ... FOR UPDATE to lock table
143137 $ doctrineEnvelope = $ this ->executeQuery (
144138 $ query ->getSQL ().' ' .$ this ->driverConnection ->getDatabasePlatform ()->getWriteLockSQL (),
145- [
146- ':now ' => self ::formatDateTime ($ now ),
147- ':queue_name ' => $ this ->configuration ['queue_name ' ],
148- ':redeliver_limit ' => self ::formatDateTime ($ redeliverLimit ),
149- ]
139+ $ query ->getParameters ()
150140 )->fetch ();
151141
152142 if (false === $ doctrineEnvelope ) {
@@ -161,6 +151,7 @@ public function get(): ?array
161151 ->update ($ this ->configuration ['table_name ' ])
162152 ->set ('delivered_at ' , ':delivered_at ' )
163153 ->where ('id = :id ' );
154+ $ now = \DateTime::createFromFormat ('U.u ' , microtime (true ));
164155 $ this ->executeQuery ($ queryBuilder ->getSQL (), [
165156 ':id ' => $ doctrineEnvelope ['id ' ],
166157 ':delivered_at ' => self ::formatDateTime ($ now ),
@@ -200,6 +191,33 @@ public function setup(): void
200191 $ synchronizer ->updateSchema ($ this ->getSchema (), true );
201192 }
202193
194+ public function getMessageCount (): int
195+ {
196+ $ queryBuilder = $ this ->createAvailableMessagesQueryBuilder ()
197+ ->select ('COUNT(m.id) as message_count ' )
198+ ->setMaxResults (1 );
199+
200+ return $ this ->executeQuery ($ queryBuilder ->getSQL (), $ queryBuilder ->getParameters ())->fetchColumn ();
201+ }
202+
203+ private function createAvailableMessagesQueryBuilder (): QueryBuilder
204+ {
205+ $ now = \DateTime::createFromFormat ('U.u ' , microtime (true ));
206+ $ redeliverLimit = (clone $ now )->modify (sprintf ('-%d seconds ' , $ this ->configuration ['redeliver_timeout ' ]));
207+
208+ return $ this ->driverConnection ->createQueryBuilder ()
209+ ->select ('m.* ' )
210+ ->from ($ this ->configuration ['table_name ' ], 'm ' )
211+ ->where ('m.delivered_at is null OR m.delivered_at < :redeliver_limit ' )
212+ ->andWhere ('m.available_at <= :now ' )
213+ ->andWhere ('m.queue_name = :queue_name ' )
214+ ->setParameters ([
215+ ':now ' => self ::formatDateTime ($ now ),
216+ ':queue_name ' => $ this ->configuration ['queue_name ' ],
217+ ':redeliver_limit ' => self ::formatDateTime ($ redeliverLimit ),
218+ ]);
219+ }
220+
203221 private function executeQuery (string $ sql , array $ parameters = [])
204222 {
205223 $ stmt = null ;
0 commit comments