Skip to content

Commit 50943d2

Browse files
committed
Support dbal v3
1 parent 2e5c41c commit 50943d2

File tree

8 files changed

+69
-43
lines changed

8 files changed

+69
-43
lines changed

pkg/dbal/DbalConsumer.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class DbalConsumer implements Consumer
1515
{
1616
use ConsumerPollingTrait;
1717
use DbalConsumerHelperTrait;
18+
use DbalTypeResolverTrait;
1819

1920
/**
2021
* @var DbalContext

pkg/dbal/DbalConsumerHelperTrait.php

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
use Doctrine\DBAL\Connection;
88
use Doctrine\DBAL\Exception\RetryableException;
9-
use Doctrine\DBAL\Types\Type;
109
use Ramsey\Uuid\Uuid;
1110

1211
trait DbalConsumerHelperTrait
@@ -39,7 +38,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
3938
->addOrderBy('priority', 'asc')
4039
->addOrderBy('published_at', 'asc')
4140
->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY)
42-
->setParameter('delayedUntil', $now, Type::INTEGER)
41+
->setParameter('delayedUntil', $now, static::resolveDbalType('INTEGER'))
4342
->setMaxResults(1);
4443

4544
$update = $this->getConnection()->createQueryBuilder()
@@ -48,8 +47,8 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
4847
->set('redeliver_after', ':redeliverAfter')
4948
->andWhere('id = :messageId')
5049
->andWhere('delivery_id IS NULL')
51-
->setParameter('deliveryId', $deliveryId, Type::GUID)
52-
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
50+
->setParameter('deliveryId', $deliveryId, static::resolveDbalType('GUID'))
51+
->setParameter('redeliverAfter', $now + $redeliveryDelay, static::resolveDbalType('BIGINT'))
5352
;
5453

5554
while (microtime(true) < $endAt) {
@@ -60,14 +59,14 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
6059
}
6160

6261
$update
63-
->setParameter('messageId', $result['id'], Type::GUID);
62+
->setParameter('messageId', $result['id'], static::resolveDbalType('GUID'));
6463

6564
if ($update->execute()) {
6665
$deliveredMessage = $this->getConnection()->createQueryBuilder()
6766
->select('*')
6867
->from($this->getContext()->getTableName())
6968
->andWhere('delivery_id = :deliveryId')
70-
->setParameter('deliveryId', $deliveryId, Type::GUID)
69+
->setParameter('deliveryId', $deliveryId, static::resolveDbalType('GUID'))
7170
->setMaxResults(1)
7271
->execute()
7372
->fetch();
@@ -103,9 +102,9 @@ protected function redeliverMessages(): void
103102
->set('redelivered', ':redelivered')
104103
->andWhere('redeliver_after < :now')
105104
->andWhere('delivery_id IS NOT NULL')
106-
->setParameter(':now', time(), Type::BIGINT)
107-
->setParameter('deliveryId', null, Type::GUID)
108-
->setParameter('redelivered', true, Type::BOOLEAN)
105+
->setParameter(':now', time(), static::resolveDbalType('BIGINT'))
106+
->setParameter('deliveryId', null, static::resolveDbalType('GUID'))
107+
->setParameter('redelivered', true, static::resolveDbalType('BOOLEAN'))
109108
;
110109

111110
try {
@@ -131,8 +130,8 @@ protected function removeExpiredMessages(): void
131130
->andWhere('delivery_id IS NULL')
132131
->andWhere('redelivered = :redelivered')
133132

134-
->setParameter(':now', time(), Type::BIGINT)
135-
->setParameter('redelivered', false, Type::BOOLEAN)
133+
->setParameter(':now', time(), static::resolveDbalType('BIGINT'))
134+
->setParameter('redelivered', false, static::resolveDbalType('BOOLEAN'))
136135
;
137136

138137
try {
@@ -153,7 +152,7 @@ private function deleteMessage(string $deliveryId): void
153152
$this->getConnection()->delete(
154153
$this->getContext()->getTableName(),
155154
['delivery_id' => $deliveryId],
156-
['delivery_id' => Type::GUID]
155+
['delivery_id' => static::resolveDbalType('GUID')]
157156
);
158157
}
159158
}

pkg/dbal/DbalContext.php

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
use Doctrine\DBAL\Connection;
88
use Doctrine\DBAL\Schema\Table;
9-
use Doctrine\DBAL\Types\Type;
109
use Interop\Queue\Consumer;
1110
use Interop\Queue\Context;
1211
use Interop\Queue\Destination;
@@ -183,7 +182,7 @@ public function purgeQueue(Queue $queue): void
183182
$this->getDbalConnection()->delete(
184183
$this->getTableName(),
185184
['queue' => $queue->getQueueName()],
186-
['queue' => Type::STRING]
185+
['queue' => static::resolveDbalType('STRING')]
187186
);
188187
}
189188

@@ -221,18 +220,18 @@ public function createDataBaseTable(): void
221220

222221
$table = new Table($this->getTableName());
223222

224-
$table->addColumn('id', Type::GUID, ['length' => 16, 'fixed' => true]);
225-
$table->addColumn('published_at', Type::BIGINT);
226-
$table->addColumn('body', Type::TEXT, ['notnull' => false]);
227-
$table->addColumn('headers', Type::TEXT, ['notnull' => false]);
228-
$table->addColumn('properties', Type::TEXT, ['notnull' => false]);
229-
$table->addColumn('redelivered', Type::BOOLEAN, ['notnull' => false]);
230-
$table->addColumn('queue', Type::STRING);
231-
$table->addColumn('priority', Type::SMALLINT, ['notnull' => false]);
232-
$table->addColumn('delayed_until', Type::BIGINT, ['notnull' => false]);
233-
$table->addColumn('time_to_live', Type::BIGINT, ['notnull' => false]);
234-
$table->addColumn('delivery_id', Type::GUID, ['length' => 16, 'fixed' => true, 'notnull' => false]);
235-
$table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]);
223+
$table->addColumn('id', static::resolveDbalType('GUID'), ['length' => 16, 'fixed' => true]);
224+
$table->addColumn('published_at', static::resolveDbalType('BIGINT'));
225+
$table->addColumn('body', static::resolveDbalType('TEXT'), ['notnull' => false]);
226+
$table->addColumn('headers', static::resolveDbalType('TEXT'), ['notnull' => false]);
227+
$table->addColumn('properties', static::resolveDbalType('TEXT'), ['notnull' => false]);
228+
$table->addColumn('redelivered', static::resolveDbalType('BOOLEAN'), ['notnull' => false]);
229+
$table->addColumn('queue', static::resolveDbalType('STRING'));
230+
$table->addColumn('priority', static::resolveDbalType('SMALLINT'), ['notnull' => false]);
231+
$table->addColumn('delayed_until', static::resolveDbalType('BIGINT'), ['notnull' => false]);
232+
$table->addColumn('time_to_live', static::resolveDbalType('BIGINT'), ['notnull' => false]);
233+
$table->addColumn('delivery_id', static::resolveDbalType('GUID'), ['length' => 16, 'fixed' => true, 'notnull' => false]);
234+
$table->addColumn('redeliver_after', static::resolveDbalType('BIGINT'), ['notnull' => false]);
236235

237236
$table->setPrimaryKey(['id']);
238237
$table->addIndex(['priority', 'published_at', 'queue', 'delivery_id', 'delayed_until', 'id']);

pkg/dbal/DbalProducer.php

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
namespace Enqueue\Dbal;
66

7-
use Doctrine\DBAL\Types\Type;
87
use Interop\Queue\Destination;
98
use Interop\Queue\Exception\Exception;
109
use Interop\Queue\Exception\InvalidDestinationException;
@@ -15,6 +14,8 @@
1514

1615
class DbalProducer implements Producer
1716
{
17+
use DbalTypeResolverTrait;
18+
1819
/**
1920
* @var int|null
2021
*/
@@ -107,18 +108,18 @@ public function send(Destination $destination, Message $message): void
107108

108109
try {
109110
$rowsAffected = $this->context->getDbalConnection()->insert($this->context->getTableName(), $dbalMessage, [
110-
'id' => Type::GUID,
111-
'published_at' => Type::INTEGER,
112-
'body' => Type::TEXT,
113-
'headers' => Type::TEXT,
114-
'properties' => Type::TEXT,
115-
'priority' => Type::SMALLINT,
116-
'queue' => Type::STRING,
117-
'time_to_live' => Type::INTEGER,
118-
'delayed_until' => Type::INTEGER,
119-
'redelivered' => Type::SMALLINT,
120-
'delivery_id' => Type::STRING,
121-
'redeliver_after' => Type::BIGINT,
111+
'id' => static::resolveDbalType('GUID'),
112+
'published_at' => static::resolveDbalType('INTEGER'),
113+
'body' => static::resolveDbalType('TEXT'),
114+
'headers' => static::resolveDbalType('TEXT'),
115+
'properties' => static::resolveDbalType('TEXT'),
116+
'priority' => static::resolveDbalType('SMALLINT'),
117+
'queue' => static::resolveDbalType('STRING'),
118+
'time_to_live' => static::resolveDbalType('INTEGER'),
119+
'delayed_until' => static::resolveDbalType('INTEGER'),
120+
'redelivered' => static::resolveDbalType('SMALLINT'),
121+
'delivery_id' => static::resolveDbalType('STRING'),
122+
'redeliver_after' => static::resolveDbalType('BIGINT'),
122123
]);
123124

124125
if (1 !== $rowsAffected) {

pkg/dbal/DbalSubscriptionConsumer.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
class DbalSubscriptionConsumer implements SubscriptionConsumer
1212
{
1313
use DbalConsumerHelperTrait;
14+
use DbalTypeResolverTrait;
1415

1516
/**
1617
* @var DbalContext

pkg/dbal/DbalTypeResolverTrait.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Dbal;
6+
7+
trait DbalTypeResolverTrait
8+
{
9+
protected static function resolveDbalType(string $typeName): string
10+
{
11+
static $reflection;
12+
13+
$className = 'Doctrine\\DBAL\\Types\\Type';
14+
15+
// Use types when using dbal v3
16+
if (class_exists('Doctrine\\DBAL\\Types\\Types')) {
17+
$className = 'Doctrine\\DBAL\\Types\\Types';
18+
}
19+
20+
return ($reflection = $reflection ?? new \ReflectionClass($className))
21+
->getConstant($typeName);
22+
}
23+
}

pkg/dbal/Tests/DbalConsumerTest.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
namespace Enqueue\Dbal\Tests;
66

77
use Doctrine\DBAL\Connection;
8-
use Doctrine\DBAL\Types\Type;
8+
use Doctrine\DBAL\Types\Types;
99
use Enqueue\Dbal\DbalConsumer;
1010
use Enqueue\Dbal\DbalContext;
1111
use Enqueue\Dbal\DbalDestination;
1212
use Enqueue\Dbal\DbalMessage;
1313
use Enqueue\Dbal\DbalProducer;
14+
use Enqueue\Dbal\DbalTypeResolverTrait;
1415
use Enqueue\Test\ClassExtensionTrait;
1516
use Interop\Queue\Consumer;
1617
use Interop\Queue\Exception\InvalidMessageException;
@@ -22,6 +23,7 @@
2223
class DbalConsumerTest extends TestCase
2324
{
2425
use ClassExtensionTrait;
26+
use DbalTypeResolverTrait;
2527

2628
public function testShouldImplementConsumerInterface()
2729
{
@@ -72,7 +74,7 @@ public function testShouldDeleteMessageOnAcknowledge()
7274
->with(
7375
'some-table-name',
7476
['delivery_id' => $deliveryId->toString()],
75-
['delivery_id' => Type::GUID]
77+
['delivery_id' => static::resolveDbalType('GUID')]
7678
)
7779
;
7880

@@ -143,7 +145,7 @@ public function testShouldDeleteMessageFromQueueOnReject()
143145
->with(
144146
'some-table-name',
145147
['delivery_id' => $deliveryId->toString()],
146-
['delivery_id' => Type::GUID]
148+
['delivery_id' => static::resolveDbalType('GUID')]
147149
)
148150
;
149151

pkg/dbal/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"require": {
99
"php": "^7.3|^8.0",
1010
"queue-interop/queue-interop": "^0.8",
11-
"doctrine/dbal": "^2.12",
11+
"doctrine/dbal": "^2.12|^3.1",
1212
"doctrine/persistence": "^1.3.3|^2.0",
1313
"ramsey/uuid": "^3.5|^4"
1414
},

0 commit comments

Comments
 (0)