From a7019ca76a1754356c8aecc292417bcc216f279c Mon Sep 17 00:00:00 2001 From: Derk van den Bergh Date: Thu, 11 Aug 2022 11:17:18 +0200 Subject: [PATCH 1/4] add standoff to consumer --- Configuration.php | 9 +++++++++ DependencyInjection.php | 2 ++ README.md | 8 ++++++++ components/Consumer.php | 19 +++++++++++++++++++ composer.json | 8 +++++++- 5 files changed, 45 insertions(+), 1 deletion(-) diff --git a/Configuration.php b/Configuration.php index 9d5d2aa..2c546d1 100644 --- a/Configuration.php +++ b/Configuration.php @@ -30,6 +30,7 @@ class Configuration extends Component */ const DEFAULTS = [ 'auto_declare' => true, + 'consumer_standoff' => 3, 'connections' => [ [ 'name' => self::DEFAULT_CONNECTION_NAME, @@ -122,6 +123,7 @@ class Configuration extends Component ]; public $auto_declare = null; + public $consumer_standoff = null; public $connections = []; public $producers = []; public $consumers = []; @@ -250,6 +252,10 @@ protected function validateTopLevel() throw new InvalidConfigException("Option `auto_declare` should be of type boolean."); } + if (($this->consumer_standoff !== null) && !is_int($this->consumer_standoff)) { + throw new InvalidConfigException("Option `consumer_standoff` should be of type int."); + } + if (!is_array($this->logger)) { throw new InvalidConfigException("Option `logger` should be of type array."); } @@ -436,6 +442,9 @@ protected function completeWithDefaults() if (null === $this->auto_declare) { $this->auto_declare = $defaults['auto_declare']; } + if (null === $this->consumer_standoff) { + $this->consumer_standoff = $defaults['consumer_standoff']; + } if (empty($this->logger)) { $this->logger = $defaults['logger']; } else { diff --git a/DependencyInjection.php b/DependencyInjection.php index 0c1fe92..70893da 100644 --- a/DependencyInjection.php +++ b/DependencyInjection.php @@ -116,6 +116,7 @@ protected function registerProducers(Configuration $config) protected function registerConsumers(Configuration $config) { $autoDeclare = $config->auto_declare; + $consumerStandoff = $config->consumer_standoff; foreach ($config->consumers as $options) { $serviceAlias = sprintf(Configuration::CONSUMER_SERVICE_NAME, $options['name']); \Yii::$container->setSingleton($serviceAlias, function () use ($options, $autoDeclare) { @@ -144,6 +145,7 @@ protected function registerConsumers(Configuration $config) \Yii::$container->invoke([$consumer, 'setIdleTimeoutExitCode'], [$options['idle_timeout_exit_code']]); \Yii::$container->invoke([$consumer, 'setProceedOnException'], [$options['proceed_on_exception']]); \Yii::$container->invoke([$consumer, 'setDeserializer'], [$options['deserializer']]); + \Yii::$container->invoke([$consumer, 'setStandoff'], [$options['deserializer']]); return $consumer; }); diff --git a/README.md b/README.md index 7730567..b155391 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,13 @@ As PHP daemon especially based upon a framework may be prone to memory leaks, it #### Auto-declare By default extension configured in auto-declare mode, which means that on every message published exchanges, queues and bindings will be checked and created if missing. If performance means much to your application you should disable that feature in configuration and use console commands to declare and delete routing schema by yourself. +#### Consumer standoff +PHP processes have a reputation of leaking memory, especially when running as a daemon. It is therefore prudent to limit the runtime of AMQP consumers by frequently reloading them. This is achieved by terminating consumers after handling a maximum number of messages. + +However, when using certain service managers (`supervisord`, for example), this presents a problem. If the number of consumed messages is reached within a certain time period, the service manager will consider the consumer to have exited too soon and assume there is a problem with the service. Removing this threshold in the service manager may lead to infinite restart loops. + +The chosen solution is to implement a standoff period that is applied before the actual consumer loop is started. This will ensure that the controller action can actually be executed because the bootstrapping has succeeded, but the process waits long enough to satisfy the service manager (the threshold may differ per service manager). The default standoff period is 3 seconds, but it can be adjusted in the module settings. + Usage ------------- As the consumer worker will read messages from the queue, execute a callback method and pass a message to it. @@ -248,6 +255,7 @@ All configuration options: ```php $rabbitmq_defaults = [ 'auto_declare' => true, + 'consumer_standoff' => 3, 'connections' => [ [ 'name' => self::DEFAULT_CONNECTION_NAME, diff --git a/components/Consumer.php b/components/Consumer.php index 08d04e1..384d8bd 100644 --- a/components/Consumer.php +++ b/components/Consumer.php @@ -34,6 +34,8 @@ class Consumer extends BaseRabbitMQ protected $name = 'unnamed'; + protected $standoff = 0; + private $id; private $target; @@ -159,6 +161,22 @@ public function getName(): string return $this->name; } + /** + * @param int $duration in seconds + */ + public function setStandoff(int $duration) + { + $this->standoff = $duration; + } + + /** + * @return int + */ + public function getStandoff(): int + { + return $this->standoff; + } + /** * Resets the consumed property. * Use when you want to call start() or consume() multiple times. @@ -483,6 +501,7 @@ protected function setup() $this->routing->declareAll(); } $this->setQosOptions(); + sleep($this->standoff); $this->startConsuming(); } } diff --git a/composer.json b/composer.json index b72ac34..2600823 100644 --- a/composer.json +++ b/composer.json @@ -26,5 +26,11 @@ }, "extra": { "bootstrap": "mikemadisonweb\\rabbitmq\\DependencyInjection" - } + }, + "repositories": [ + { + "type": "composer", + "url": "https://asset-packagist.org" + } + ] } From b65896e48a7bf5cb85257be723459c028e734588 Mon Sep 17 00:00:00 2001 From: Derk van den Bergh Date: Thu, 11 Aug 2022 11:19:46 +0200 Subject: [PATCH 2/4] adjust default value --- Configuration.php | 2 +- README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Configuration.php b/Configuration.php index 2c546d1..d3b03c4 100644 --- a/Configuration.php +++ b/Configuration.php @@ -30,7 +30,7 @@ class Configuration extends Component */ const DEFAULTS = [ 'auto_declare' => true, - 'consumer_standoff' => 3, + 'consumer_standoff' => 0, 'connections' => [ [ 'name' => self::DEFAULT_CONNECTION_NAME, diff --git a/README.md b/README.md index b155391..d25cbf1 100644 --- a/README.md +++ b/README.md @@ -202,7 +202,7 @@ PHP processes have a reputation of leaking memory, especially when running as a However, when using certain service managers (`supervisord`, for example), this presents a problem. If the number of consumed messages is reached within a certain time period, the service manager will consider the consumer to have exited too soon and assume there is a problem with the service. Removing this threshold in the service manager may lead to infinite restart loops. -The chosen solution is to implement a standoff period that is applied before the actual consumer loop is started. This will ensure that the controller action can actually be executed because the bootstrapping has succeeded, but the process waits long enough to satisfy the service manager (the threshold may differ per service manager). The default standoff period is 3 seconds, but it can be adjusted in the module settings. +The chosen solution is to implement a standoff period that is applied before the actual consumer loop is started. This will ensure that the controller action can actually be executed because the bootstrapping has succeeded, but the process waits long enough to satisfy the service manager (the threshold may differ per service manager). The default standoff period is 0 seconds due to the nature of this problem. Usage ------------- From d3f5cbde60bd2d6dabeb0c7304382654adbf8c90 Mon Sep 17 00:00:00 2001 From: Derk van den Bergh Date: Thu, 11 Aug 2022 11:21:04 +0200 Subject: [PATCH 3/4] adjust default value in readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d25cbf1..3cdbf2d 100644 --- a/README.md +++ b/README.md @@ -255,7 +255,7 @@ All configuration options: ```php $rabbitmq_defaults = [ 'auto_declare' => true, - 'consumer_standoff' => 3, + 'consumer_standoff' => 0, 'connections' => [ [ 'name' => self::DEFAULT_CONNECTION_NAME, From 6f1cdf51f0c1acfc148b5484e9f7aceaf2e9959e Mon Sep 17 00:00:00 2001 From: Derk van den Bergh Date: Thu, 11 Aug 2022 12:02:55 +0200 Subject: [PATCH 4/4] fix dependecy injection type error --- DependencyInjection.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DependencyInjection.php b/DependencyInjection.php index 70893da..250c469 100644 --- a/DependencyInjection.php +++ b/DependencyInjection.php @@ -119,7 +119,7 @@ protected function registerConsumers(Configuration $config) $consumerStandoff = $config->consumer_standoff; foreach ($config->consumers as $options) { $serviceAlias = sprintf(Configuration::CONSUMER_SERVICE_NAME, $options['name']); - \Yii::$container->setSingleton($serviceAlias, function () use ($options, $autoDeclare) { + \Yii::$container->setSingleton($serviceAlias, function () use ($options, $autoDeclare, $consumerStandoff) { /** * @var $connection AbstractConnection */ @@ -145,7 +145,7 @@ protected function registerConsumers(Configuration $config) \Yii::$container->invoke([$consumer, 'setIdleTimeoutExitCode'], [$options['idle_timeout_exit_code']]); \Yii::$container->invoke([$consumer, 'setProceedOnException'], [$options['proceed_on_exception']]); \Yii::$container->invoke([$consumer, 'setDeserializer'], [$options['deserializer']]); - \Yii::$container->invoke([$consumer, 'setStandoff'], [$options['deserializer']]); + \Yii::$container->invoke([$consumer, 'setStandoff'], [$consumerStandoff]); return $consumer; });