diff --git a/Configuration.php b/Configuration.php index 9d5d2aa..d3b03c4 100644 --- a/Configuration.php +++ b/Configuration.php @@ -30,6 +30,7 @@ class Configuration extends Component */ const DEFAULTS = [ 'auto_declare' => true, + 'consumer_standoff' => 0, 'connections' => [ [ 'name' => self::DEFAULT_CONNECTION_NAME, @@ -122,6 +123,7 @@ class Configuration extends Component ]; public $auto_declare = null; + public $consumer_standoff = null; public $connections = []; public $producers = []; public $consumers = []; @@ -250,6 +252,10 @@ protected function validateTopLevel() throw new InvalidConfigException("Option `auto_declare` should be of type boolean."); } + if (($this->consumer_standoff !== null) && !is_int($this->consumer_standoff)) { + throw new InvalidConfigException("Option `consumer_standoff` should be of type int."); + } + if (!is_array($this->logger)) { throw new InvalidConfigException("Option `logger` should be of type array."); } @@ -436,6 +442,9 @@ protected function completeWithDefaults() if (null === $this->auto_declare) { $this->auto_declare = $defaults['auto_declare']; } + if (null === $this->consumer_standoff) { + $this->consumer_standoff = $defaults['consumer_standoff']; + } if (empty($this->logger)) { $this->logger = $defaults['logger']; } else { diff --git a/DependencyInjection.php b/DependencyInjection.php index 0c1fe92..250c469 100644 --- a/DependencyInjection.php +++ b/DependencyInjection.php @@ -116,9 +116,10 @@ protected function registerProducers(Configuration $config) protected function registerConsumers(Configuration $config) { $autoDeclare = $config->auto_declare; + $consumerStandoff = $config->consumer_standoff; foreach ($config->consumers as $options) { $serviceAlias = sprintf(Configuration::CONSUMER_SERVICE_NAME, $options['name']); - \Yii::$container->setSingleton($serviceAlias, function () use ($options, $autoDeclare) { + \Yii::$container->setSingleton($serviceAlias, function () use ($options, $autoDeclare, $consumerStandoff) { /** * @var $connection AbstractConnection */ @@ -144,6 +145,7 @@ protected function registerConsumers(Configuration $config) \Yii::$container->invoke([$consumer, 'setIdleTimeoutExitCode'], [$options['idle_timeout_exit_code']]); \Yii::$container->invoke([$consumer, 'setProceedOnException'], [$options['proceed_on_exception']]); \Yii::$container->invoke([$consumer, 'setDeserializer'], [$options['deserializer']]); + \Yii::$container->invoke([$consumer, 'setStandoff'], [$consumerStandoff]); return $consumer; }); diff --git a/README.md b/README.md index 7730567..3cdbf2d 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,13 @@ As PHP daemon especially based upon a framework may be prone to memory leaks, it #### Auto-declare By default extension configured in auto-declare mode, which means that on every message published exchanges, queues and bindings will be checked and created if missing. If performance means much to your application you should disable that feature in configuration and use console commands to declare and delete routing schema by yourself. +#### Consumer standoff +PHP processes have a reputation of leaking memory, especially when running as a daemon. It is therefore prudent to limit the runtime of AMQP consumers by frequently reloading them. This is achieved by terminating consumers after handling a maximum number of messages. + +However, when using certain service managers (`supervisord`, for example), this presents a problem. If the number of consumed messages is reached within a certain time period, the service manager will consider the consumer to have exited too soon and assume there is a problem with the service. Removing this threshold in the service manager may lead to infinite restart loops. + +The chosen solution is to implement a standoff period that is applied before the actual consumer loop is started. This will ensure that the controller action can actually be executed because the bootstrapping has succeeded, but the process waits long enough to satisfy the service manager (the threshold may differ per service manager). The default standoff period is 0 seconds due to the nature of this problem. + Usage ------------- As the consumer worker will read messages from the queue, execute a callback method and pass a message to it. @@ -248,6 +255,7 @@ All configuration options: ```php $rabbitmq_defaults = [ 'auto_declare' => true, + 'consumer_standoff' => 0, 'connections' => [ [ 'name' => self::DEFAULT_CONNECTION_NAME, diff --git a/components/Consumer.php b/components/Consumer.php index 08d04e1..384d8bd 100644 --- a/components/Consumer.php +++ b/components/Consumer.php @@ -34,6 +34,8 @@ class Consumer extends BaseRabbitMQ protected $name = 'unnamed'; + protected $standoff = 0; + private $id; private $target; @@ -159,6 +161,22 @@ public function getName(): string return $this->name; } + /** + * @param int $duration in seconds + */ + public function setStandoff(int $duration) + { + $this->standoff = $duration; + } + + /** + * @return int + */ + public function getStandoff(): int + { + return $this->standoff; + } + /** * Resets the consumed property. * Use when you want to call start() or consume() multiple times. @@ -483,6 +501,7 @@ protected function setup() $this->routing->declareAll(); } $this->setQosOptions(); + sleep($this->standoff); $this->startConsuming(); } } diff --git a/composer.json b/composer.json index b72ac34..2600823 100644 --- a/composer.json +++ b/composer.json @@ -26,5 +26,11 @@ }, "extra": { "bootstrap": "mikemadisonweb\\rabbitmq\\DependencyInjection" - } + }, + "repositories": [ + { + "type": "composer", + "url": "https://asset-packagist.org" + } + ] }