Skip to content

Commit f011768

Browse files
authored
Merge branch 'php-enqueue:master' into master
2 parents ce454a7 + 9514a82 commit f011768

32 files changed

+458
-141
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ jobs:
120120
strategy:
121121
fail-fast: false
122122
matrix:
123-
php: ['7.3', '8.0'] # same as in the container
123+
php: ['7.4', '8.0'] # same as in the container
124124
symfony_version: ['4.4.*', '5.2.*']
125125
dependencies: ['--prefer-lowest', '--prefer-dist']
126126
rdkafka_action: ['exclude-group', 'group']
@@ -159,5 +159,7 @@ jobs:
159159
- run: sed -i 's/525568/16777471/' vendor/kwn/php-rdkafka-stubs/stubs/constants.php
160160

161161
- run: bin/dev -b
162+
env:
163+
PHP_VERSION: ${{ matrix.php }}
162164

163165
- run: bin/test.sh --${{ matrix.rdkafka_action }}=rdkafka

CHANGELOG.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,28 @@
11
# Change Log
22

3+
## [0.10.13](https://github.com/php-enqueue/enqueue-dev/tree/0.10.13) (2021-08-25)
4+
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.10.12...0.10.13)
5+
6+
**Merged pull requests:**
7+
8+
- \[SNSQS\] added possibility to send message attributes using snsqs transport [\#1195](https://github.com/php-enqueue/enqueue-dev/pull/1195) ([onatskyy](https://github.com/onatskyy))
9+
- Add in missing arg [\#1194](https://github.com/php-enqueue/enqueue-dev/pull/1194) ([gdsmith](https://github.com/gdsmith))
10+
- \#1190 add index on delivery\_id to prevent slow queries [\#1191](https://github.com/php-enqueue/enqueue-dev/pull/1191) ([commercewerft](https://github.com/commercewerft))
11+
- Add setTopicArn methods to SnsContext and SnsQsContext [\#1189](https://github.com/php-enqueue/enqueue-dev/pull/1189) ([gdsmith](https://github.com/gdsmith))
12+
13+
## [0.10.11](https://github.com/php-enqueue/enqueue-dev/tree/0.10.11) (2021-04-28)
14+
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.10.10...0.10.11)
15+
16+
**Merged pull requests:**
17+
18+
- Perform at least once delivery when rejecting with requeue [\#1165](https://github.com/php-enqueue/enqueue-dev/pull/1165) ([dgafka](https://github.com/dgafka))
19+
- Fix dbal delivery delay to always keep integer value [\#1161](https://github.com/php-enqueue/enqueue-dev/pull/1161) ([dgafka](https://github.com/dgafka))
20+
- Add SqsConsumer methods to SnsQsConsumer [\#1160](https://github.com/php-enqueue/enqueue-dev/pull/1160) ([gdsmith](https://github.com/gdsmith))
21+
- add subscription\_interval as config for dbal subscription consumer [\#1159](https://github.com/php-enqueue/enqueue-dev/pull/1159) ([mordilion](https://github.com/mordilion))
22+
- register worker callback only once, move to constructor [\#1157](https://github.com/php-enqueue/enqueue-dev/pull/1157) ([cturbelin](https://github.com/cturbelin))
23+
- Try to change doctrine/orm version for supporting 2.8 \(PHP 8 support\). [\#1155](https://github.com/php-enqueue/enqueue-dev/pull/1155) ([GothShoot](https://github.com/GothShoot))
24+
- sns context - fallback for not breaking BC with 10.10 previous versions [\#1149](https://github.com/php-enqueue/enqueue-dev/pull/1149) ([bafor](https://github.com/bafor))
25+
326
## [0.10.10](https://github.com/php-enqueue/enqueue-dev/tree/0.10.10) (2021-03-24)
427
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.10.9...0.10.10)
528

composer.json

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
"queue-interop/amqp-interop": "^0.8.2",
2626
"queue-interop/queue-interop": "^0.8.1",
2727
"bunny/bunny": "^0.4|^0.5",
28-
"php-amqplib/php-amqplib": "^2.12.1",
29-
"doctrine/dbal": "^2.12",
28+
"php-amqplib/php-amqplib": "^3.0",
29+
"doctrine/dbal": "^2.12|^3.1",
3030
"ramsey/uuid": "^3.5|^4",
31-
"psr/log": "^1.1",
31+
"psr/log": "^1.1 || ^2.0 || ^3.0",
3232
"psr/container": "^1",
3333
"makasim/temp-file": "^0.2",
3434
"google/cloud-pubsub": "^1.4.3",
@@ -42,15 +42,16 @@
4242
"php-http/client-common": "^2.2.1",
4343
"richardfullmer/rabbitmq-management-api": "^2.1.1",
4444
"predis/predis": "^1.1",
45-
"thruway/client": "^0.5",
45+
"thruway/client": "^0.5.5",
4646
"thruway/pawl-transport": "^0.5.1",
4747
"influxdb/influxdb-php": "^1.14",
4848
"datadog/php-datadogstatsd": "^1.3",
4949
"guzzlehttp/guzzle": "^7.0.1",
5050
"php-http/discovery": "^1.13",
5151
"voryx/thruway-common": "^1.0.1",
52-
"react/dns": "^1.0",
53-
"react/event-loop": "^1.0"
52+
"react/dns": "^1.4",
53+
"react/event-loop": "^1.2",
54+
"react/promise": "^2.8"
5455
},
5556
"require-dev": {
5657
"ext-pcntl": "*",

docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ version: '2'
33
services:
44
dev:
55
# when image publishing gets sorted:
6-
# image: enqueue/dev:${PHP_VERSION}
6+
# image: enqueue/dev:${PHP_VERSION:-7.4}
77
build:
88
context: docker
99
args:
10-
PHP_VERSION: "${PHP_VERSION:-7.3}"
10+
PHP_VERSION: "${PHP_VERSION:-7.4}"
1111
depends_on:
1212
- rabbitmq
1313
- mysql

docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
ARG PHP_VERSION=7.3
1+
ARG PHP_VERSION=7.4
22
FROM makasim/nginx-php-fpm:${PHP_VERSION}-all-exts
33

44
ARG PHP_VERSION

pkg/dbal/DbalConsumer.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,14 @@ public function reject(Message $message, bool $requeue = false): void
100100
{
101101
InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class);
102102

103-
$this->acknowledge($message);
104-
105103
if ($requeue) {
106104
$message = clone $message;
107105
$message->setRedelivered(false);
108106

109107
$this->getContext()->createProducer()->send($this->queue, $message);
110108
}
109+
110+
$this->acknowledge($message);
111111
}
112112

113113
protected function getContext(): DbalContext

pkg/dbal/DbalConsumerHelperTrait.php

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
use Doctrine\DBAL\Connection;
88
use Doctrine\DBAL\Exception\RetryableException;
9-
use Doctrine\DBAL\Types\Type;
109
use Ramsey\Uuid\Uuid;
1110

1211
trait DbalConsumerHelperTrait
@@ -39,7 +38,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
3938
->addOrderBy('priority', 'asc')
4039
->addOrderBy('published_at', 'asc')
4140
->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY)
42-
->setParameter('delayedUntil', $now, Type::INTEGER)
41+
->setParameter('delayedUntil', $now, DbalType::INTEGER)
4342
->setMaxResults(1);
4443

4544
$update = $this->getConnection()->createQueryBuilder()
@@ -48,8 +47,8 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
4847
->set('redeliver_after', ':redeliverAfter')
4948
->andWhere('id = :messageId')
5049
->andWhere('delivery_id IS NULL')
51-
->setParameter('deliveryId', $deliveryId, Type::GUID)
52-
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
50+
->setParameter('deliveryId', $deliveryId, DbalType::GUID)
51+
->setParameter('redeliverAfter', $now + $redeliveryDelay, DbalType::BIGINT)
5352
;
5453

5554
while (microtime(true) < $endAt) {
@@ -60,14 +59,14 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
6059
}
6160

6261
$update
63-
->setParameter('messageId', $result['id'], Type::GUID);
62+
->setParameter('messageId', $result['id'], DbalType::GUID);
6463

6564
if ($update->execute()) {
6665
$deliveredMessage = $this->getConnection()->createQueryBuilder()
6766
->select('*')
6867
->from($this->getContext()->getTableName())
6968
->andWhere('delivery_id = :deliveryId')
70-
->setParameter('deliveryId', $deliveryId, Type::GUID)
69+
->setParameter('deliveryId', $deliveryId, DbalType::GUID)
7170
->setMaxResults(1)
7271
->execute()
7372
->fetch();
@@ -103,9 +102,9 @@ protected function redeliverMessages(): void
103102
->set('redelivered', ':redelivered')
104103
->andWhere('redeliver_after < :now')
105104
->andWhere('delivery_id IS NOT NULL')
106-
->setParameter(':now', time(), Type::BIGINT)
107-
->setParameter('deliveryId', null, Type::GUID)
108-
->setParameter('redelivered', true, Type::BOOLEAN)
105+
->setParameter(':now', time(), DbalType::BIGINT)
106+
->setParameter('deliveryId', null, DbalType::GUID)
107+
->setParameter('redelivered', true, DbalType::BOOLEAN)
109108
;
110109

111110
try {
@@ -131,8 +130,8 @@ protected function removeExpiredMessages(): void
131130
->andWhere('delivery_id IS NULL')
132131
->andWhere('redelivered = :redelivered')
133132

134-
->setParameter(':now', time(), Type::BIGINT)
135-
->setParameter('redelivered', false, Type::BOOLEAN)
133+
->setParameter(':now', time(), DbalType::BIGINT)
134+
->setParameter('redelivered', false, DbalType::BOOLEAN)
136135
;
137136

138137
try {
@@ -153,7 +152,7 @@ private function deleteMessage(string $deliveryId): void
153152
$this->getConnection()->delete(
154153
$this->getContext()->getTableName(),
155154
['delivery_id' => $deliveryId],
156-
['delivery_id' => Type::GUID]
155+
['delivery_id' => DbalType::GUID]
157156
);
158157
}
159158
}

pkg/dbal/DbalContext.php

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
use Doctrine\DBAL\Connection;
88
use Doctrine\DBAL\Schema\Table;
9-
use Doctrine\DBAL\Types\Type;
109
use Interop\Queue\Consumer;
1110
use Interop\Queue\Context;
1211
use Interop\Queue\Destination;
@@ -39,26 +38,21 @@ class DbalContext implements Context
3938
* Callable must return instance of Doctrine\DBAL\Connection once called.
4039
*
4140
* @param Connection|callable $connection
42-
* @param array $config
4341
*/
4442
public function __construct($connection, array $config = [])
4543
{
4644
$this->config = array_replace([
4745
'table_name' => 'enqueue',
4846
'polling_interval' => null,
49-
'subscription_interval' => null,
47+
'subscription_polling_interval' => null,
5048
], $config);
5149

5250
if ($connection instanceof Connection) {
5351
$this->connection = $connection;
5452
} elseif (is_callable($connection)) {
5553
$this->connectionFactory = $connection;
5654
} else {
57-
throw new \InvalidArgumentException(sprintf(
58-
'The connection argument must be either %s or callable that returns %s.',
59-
Connection::class,
60-
Connection::class
61-
));
55+
throw new \InvalidArgumentException(sprintf('The connection argument must be either %s or callable that returns %s.', Connection::class, Connection::class));
6256
}
6357
}
6458

@@ -136,8 +130,8 @@ public function createSubscriptionConsumer(): SubscriptionConsumer
136130
$consumer->setRedeliveryDelay($this->config['redelivery_delay']);
137131
}
138132

139-
if (isset($this->config['subscription_interval'])) {
140-
$consumer->setSubscriptionInterval($this->config['subscription_interval']);
133+
if (isset($this->config['subscription_polling_interval'])) {
134+
$consumer->setPollingInterval($this->config['subscription_polling_interval']);
141135
}
142136

143137
return $consumer;
@@ -188,7 +182,7 @@ public function purgeQueue(Queue $queue): void
188182
$this->getDbalConnection()->delete(
189183
$this->getTableName(),
190184
['queue' => $queue->getQueueName()],
191-
['queue' => Type::STRING]
185+
['queue' => DbalType::STRING]
192186
);
193187
}
194188

@@ -207,10 +201,7 @@ public function getDbalConnection(): Connection
207201
if (false == $this->connection) {
208202
$connection = call_user_func($this->connectionFactory);
209203
if (false == $connection instanceof Connection) {
210-
throw new \LogicException(sprintf(
211-
'The factory must return instance of Doctrine\DBAL\Connection. It returns %s',
212-
is_object($connection) ? get_class($connection) : gettype($connection)
213-
));
204+
throw new \LogicException(sprintf('The factory must return instance of Doctrine\DBAL\Connection. It returns %s', is_object($connection) ? get_class($connection) : gettype($connection)));
214205
}
215206

216207
$this->connection = $connection;
@@ -229,24 +220,25 @@ public function createDataBaseTable(): void
229220

230221
$table = new Table($this->getTableName());
231222

232-
$table->addColumn('id', Type::GUID, ['length' => 16, 'fixed' => true]);
233-
$table->addColumn('published_at', Type::BIGINT);
234-
$table->addColumn('body', Type::TEXT, ['notnull' => false]);
235-
$table->addColumn('headers', Type::TEXT, ['notnull' => false]);
236-
$table->addColumn('properties', Type::TEXT, ['notnull' => false]);
237-
$table->addColumn('redelivered', Type::BOOLEAN, ['notnull' => false]);
238-
$table->addColumn('queue', Type::STRING);
239-
$table->addColumn('priority', Type::SMALLINT, ['notnull' => false]);
240-
$table->addColumn('delayed_until', Type::BIGINT, ['notnull' => false]);
241-
$table->addColumn('time_to_live', Type::BIGINT, ['notnull' => false]);
242-
$table->addColumn('delivery_id', Type::GUID, ['length' => 16, 'fixed' => true, 'notnull' => false]);
243-
$table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]);
223+
$table->addColumn('id', DbalType::GUID, ['length' => 16, 'fixed' => true]);
224+
$table->addColumn('published_at', DbalType::BIGINT);
225+
$table->addColumn('body', DbalType::TEXT, ['notnull' => false]);
226+
$table->addColumn('headers', DbalType::TEXT, ['notnull' => false]);
227+
$table->addColumn('properties', DbalType::TEXT, ['notnull' => false]);
228+
$table->addColumn('redelivered', DbalType::BOOLEAN, ['notnull' => false]);
229+
$table->addColumn('queue', DbalType::STRING);
230+
$table->addColumn('priority', DbalType::SMALLINT, ['notnull' => false]);
231+
$table->addColumn('delayed_until', DbalType::BIGINT, ['notnull' => false]);
232+
$table->addColumn('time_to_live', DbalType::BIGINT, ['notnull' => false]);
233+
$table->addColumn('delivery_id', DbalType::GUID, ['length' => 16, 'fixed' => true, 'notnull' => false]);
234+
$table->addColumn('redeliver_after', DbalType::BIGINT, ['notnull' => false]);
244235

245236
$table->setPrimaryKey(['id']);
246237
$table->addIndex(['priority', 'published_at', 'queue', 'delivery_id', 'delayed_until', 'id']);
247238

248239
$table->addIndex(['redeliver_after', 'delivery_id']);
249240
$table->addIndex(['time_to_live', 'delivery_id']);
241+
$table->addIndex(['delivery_id']);
250242

251243
$sm->createTable($table);
252244
}

pkg/dbal/DbalProducer.php

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
namespace Enqueue\Dbal;
66

7-
use Doctrine\DBAL\Types\Type;
87
use Interop\Queue\Destination;
98
use Interop\Queue\Exception\Exception;
109
use Interop\Queue\Exception\InvalidDestinationException;
@@ -35,9 +34,6 @@ class DbalProducer implements Producer
3534
*/
3635
private $context;
3736

38-
/**
39-
* @param DbalContext $context
40-
*/
4137
public function __construct(DbalContext $context)
4238
{
4339
$this->context = $context;
@@ -85,49 +81,43 @@ public function send(Destination $destination, Message $message): void
8581
$delay = $message->getDeliveryDelay();
8682
if ($delay) {
8783
if (!is_int($delay)) {
88-
throw new \LogicException(sprintf(
89-
'Delay must be integer but got: "%s"',
90-
is_object($delay) ? get_class($delay) : gettype($delay)
91-
));
84+
throw new \LogicException(sprintf('Delay must be integer but got: "%s"', is_object($delay) ? get_class($delay) : gettype($delay)));
9285
}
9386

9487
if ($delay <= 0) {
9588
throw new \LogicException(sprintf('Delay must be positive integer but got: "%s"', $delay));
9689
}
9790

98-
$dbalMessage['delayed_until'] = time() + (int) $delay / 1000;
91+
$dbalMessage['delayed_until'] = time() + (int) ($delay / 1000);
9992
}
10093

10194
$timeToLive = $message->getTimeToLive();
10295
if ($timeToLive) {
10396
if (!is_int($timeToLive)) {
104-
throw new \LogicException(sprintf(
105-
'TimeToLive must be integer but got: "%s"',
106-
is_object($timeToLive) ? get_class($timeToLive) : gettype($timeToLive)
107-
));
97+
throw new \LogicException(sprintf('TimeToLive must be integer but got: "%s"', is_object($timeToLive) ? get_class($timeToLive) : gettype($timeToLive)));
10898
}
10999

110100
if ($timeToLive <= 0) {
111101
throw new \LogicException(sprintf('TimeToLive must be positive integer but got: "%s"', $timeToLive));
112102
}
113103

114-
$dbalMessage['time_to_live'] = time() + (int) $timeToLive / 1000;
104+
$dbalMessage['time_to_live'] = time() + (int) ($timeToLive / 1000);
115105
}
116106

117107
try {
118108
$rowsAffected = $this->context->getDbalConnection()->insert($this->context->getTableName(), $dbalMessage, [
119-
'id' => Type::GUID,
120-
'published_at' => Type::INTEGER,
121-
'body' => Type::TEXT,
122-
'headers' => Type::TEXT,
123-
'properties' => Type::TEXT,
124-
'priority' => Type::SMALLINT,
125-
'queue' => Type::STRING,
126-
'time_to_live' => Type::INTEGER,
127-
'delayed_until' => Type::INTEGER,
128-
'redelivered' => Type::SMALLINT,
129-
'delivery_id' => Type::STRING,
130-
'redeliver_after' => Type::BIGINT,
109+
'id' => DbalType::GUID,
110+
'published_at' => DbalType::INTEGER,
111+
'body' => DbalType::TEXT,
112+
'headers' => DbalType::TEXT,
113+
'properties' => DbalType::TEXT,
114+
'priority' => DbalType::SMALLINT,
115+
'queue' => DbalType::STRING,
116+
'time_to_live' => DbalType::INTEGER,
117+
'delayed_until' => DbalType::INTEGER,
118+
'redelivered' => DbalType::SMALLINT,
119+
'delivery_id' => DbalType::STRING,
120+
'redeliver_after' => DbalType::BIGINT,
131121
]);
132122

133123
if (1 !== $rowsAffected) {

0 commit comments

Comments
 (0)