Skip to content

Commit d9af231

Browse files
author
Henning
committed
micro select for each queue
1 parent 1f00e43 commit d9af231

File tree

1 file changed

+18
-22
lines changed

1 file changed

+18
-22
lines changed

pkg/dbal/DbalConsumerHelperTrait.php

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,6 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
2929

3030
$endAt = microtime(true) + 0.2; // add 200ms
3131

32-
$select = $this->getConnection()->createQueryBuilder()
33-
->select('MIN(id) as id', 'queue')
34-
->from($this->getContext()->getTableName())
35-
->andWhere('queue IN (:queues)')
36-
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
37-
->andWhere('delivery_id IS NULL')
38-
->addOrderBy('priority', 'asc')
39-
->addOrderBy('published_at', 'asc')
40-
->groupBy('queue')
41-
->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY)
42-
->setParameter('delayedUntil', $now, DbalType::INTEGER);
43-
4432
$update = $this->getConnection()->createQueryBuilder()
4533
->update($this->getContext()->getTableName())
4634
->set('delivery_id', ':deliveryId')
@@ -53,12 +41,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
5341

5442
while (microtime(true) < $endAt) {
5543
try {
56-
$results = $select->execute()->fetch();
57-
if (empty($results)) {
58-
return null;
59-
}
60-
61-
$result = $this->getResultByQueueList($results, $queues);
44+
$result = $this->getResultByQueueList($queues, $now);
6245
if (empty($result)) {
6346
return null;
6447
}
@@ -161,13 +144,26 @@ private function deleteMessage(string $deliveryId): void
161144
);
162145
}
163146

164-
private function getResultByQueueList(array $results, array $queues): ?array
147+
private function getResultByQueueList(array $queues, int $now): ?array
165148
{
166-
$results = array_combine(array_column($results, 'queue'), $results);
149+
$select = $this->getConnection()->createQueryBuilder()
150+
->select('id')
151+
->from($this->getContext()->getTableName())
152+
->andWhere('queue = :queue')
153+
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
154+
->andWhere('delivery_id IS NULL')
155+
->addOrderBy('priority', 'asc')
156+
->addOrderBy('published_at', 'asc')
157+
->setParameter('delayedUntil', $now, DbalType::INTEGER)
158+
->setMaxResults(1);
167159

168160
foreach ($queues as $queue) {
169-
if (isset($results[$queue])) {
170-
return $results[$queue];
161+
$select->setParameter('queue', $queue, DbalType::STRING);
162+
163+
$result = $select->execute()->fetch();
164+
165+
if (!empty($result)) {
166+
return $result;
171167
}
172168
}
173169

0 commit comments

Comments
 (0)