Skip to content

Commit b313429

Browse files
author
Henning
committed
add logic to walk through the different queues after each message
1 parent f52be4b commit b313429

File tree

2 files changed

+30
-5
lines changed

2 files changed

+30
-5
lines changed

pkg/dbal/DbalConsumerHelperTrait.php

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,17 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
3030
$endAt = microtime(true) + 0.2; // add 200ms
3131

3232
$select = $this->getConnection()->createQueryBuilder()
33-
->select('id')
33+
->select('MIN(id)')
34+
->addSelect('queue')
3435
->from($this->getContext()->getTableName())
3536
->andWhere('queue IN (:queues)')
3637
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
3738
->andWhere('delivery_id IS NULL')
3839
->addOrderBy('priority', 'asc')
3940
->addOrderBy('published_at', 'asc')
41+
->groupBy('queue')
4042
->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY)
41-
->setParameter('delayedUntil', $now, DbalType::INTEGER)
42-
->setMaxResults(1);
43+
->setParameter('delayedUntil', $now, DbalType::INTEGER);
4344

4445
$update = $this->getConnection()->createQueryBuilder()
4546
->update($this->getContext()->getTableName())
@@ -53,7 +54,12 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
5354

5455
while (microtime(true) < $endAt) {
5556
try {
56-
$result = $select->execute()->fetch();
57+
$results = $select->execute()->fetch();
58+
if (empty($results)) {
59+
return null;
60+
}
61+
62+
$result = $this->getResultByQueueList($results, $queues);
5763
if (empty($result)) {
5864
return null;
5965
}
@@ -155,4 +161,17 @@ private function deleteMessage(string $deliveryId): void
155161
['delivery_id' => DbalType::GUID]
156162
);
157163
}
164+
165+
private function getResultByQueueList(array $results, array $queues): ?array
166+
{
167+
$results = array_combine(array_column($results, 'queue'), $results);
168+
169+
foreach ($queues as $queue) {
170+
if (isset($results[$queue])) {
171+
return $results[$queue];
172+
}
173+
}
174+
175+
return null;
176+
}
158177
}

pkg/dbal/DbalSubscriptionConsumer.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ public function consume(int $timeout = 0): void
8787

8888
$queueNames = [];
8989
foreach (array_keys($this->subscribers) as $queueName) {
90-
$queueNames[$queueName] = $queueName;
90+
$queueNames[] = $queueName;
9191
}
92+
$queueNames = array_unique($queueNames);
9293

9394
$timeout /= 1000;
9495
$now = time();
@@ -108,6 +109,11 @@ public function consume(int $timeout = 0): void
108109
if (false === call_user_func($callback, $message, $consumer)) {
109110
return;
110111
}
112+
113+
$queueNames = array_filter($queueNames, static function ($queueName) use ($message) {
114+
return $message->getQueue() !== $queueName;
115+
});
116+
$queueNames[] = $message->getQueue();
111117
} else {
112118
usleep($this->getPollingInterval() * 1000);
113119
}

0 commit comments

Comments
 (0)