77use PhpAmqpLib \Exception \AMQPTimeoutException ;
88use PhpAmqpLib \Message \AMQPMessage ;
99
10- final class BatchConsumer extends BaseAmqp implements DequeuerInterface
10+ class BatchConsumer extends BaseAmqp implements DequeuerInterface
1111{
1212 /**
1313 * @var \Closure|callable
1414 */
15- private $ callback ;
15+ protected $ callback ;
1616
1717 /**
1818 * @var bool
1919 */
20- private $ forceStop = false ;
20+ protected $ forceStop = false ;
2121
2222 /**
2323 * @var int
2424 */
25- private $ idleTimeout = 0 ;
25+ protected $ idleTimeout = 0 ;
2626
2727 /**
2828 * @var bool
@@ -32,32 +32,54 @@ final class BatchConsumer extends BaseAmqp implements DequeuerInterface
3232 /**
3333 * @var int
3434 */
35- private $ idleTimeoutExitCode ;
35+ protected $ idleTimeoutExitCode ;
3636
3737 /**
3838 * @var int
3939 */
40- private $ memoryLimit = null ;
40+ protected $ memoryLimit = null ;
4141
4242 /**
4343 * @var int
4444 */
45- private $ prefetchCount ;
45+ protected $ prefetchCount ;
4646
4747 /**
4848 * @var int
4949 */
50- private $ timeoutWait = 3 ;
50+ protected $ timeoutWait = 3 ;
5151
5252 /**
5353 * @var array
5454 */
55- private $ messages = array ();
55+ protected $ messages = array ();
5656
5757 /**
5858 * @var int
5959 */
60- private $ batchCounter = 0 ;
60+ protected $ batchCounter = 0 ;
61+
62+ /**
63+ * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
64+ * any currently running consumption will not be interrupted.
65+ */
66+ protected $ gracefulMaxExecutionDateTime ;
67+
68+ /**
69+ * @param \DateTime|null $dateTime
70+ */
71+ public function setGracefulMaxExecutionDateTime (\DateTime $ dateTime = null )
72+ {
73+ $ this ->gracefulMaxExecutionDateTime = $ dateTime ;
74+ }
75+
76+ /**
77+ * @param int $secondsInTheFuture
78+ */
79+ public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture ($ secondsInTheFuture )
80+ {
81+ $ this ->setGracefulMaxExecutionDateTime (new \DateTime ("+ {$ secondsInTheFuture } seconds " ));
82+ }
6183
6284 /**
6385 * @param \Closure|callable $callback
@@ -80,6 +102,7 @@ public function consume()
80102 $ this ->batchConsume ();
81103 }
82104
105+ $ this ->checkGracefulMaxExecutionDateTime ();
83106 $ this ->maybeStopConsumer ();
84107
85108 $ timeout = $ this ->isEmptyBatch () ? $ this ->getIdleTimeout () : $ this ->getTimeoutWait ();
@@ -530,4 +553,24 @@ public function getMemoryLimit()
530553 {
531554 return $ this ->memoryLimit ;
532555 }
556+
557+ /**
558+ * Check graceful max execution date time and stop if limit is reached
559+ *
560+ * @return void
561+ */
562+ private function checkGracefulMaxExecutionDateTime ()
563+ {
564+ if (!$ this ->gracefulMaxExecutionDateTime ) {
565+ return ;
566+ }
567+
568+ $ now = new \DateTime ();
569+
570+ if ($ this ->gracefulMaxExecutionDateTime > $ now ) {
571+ return ;
572+ }
573+
574+ $ this ->forceStopConsumer ();
575+ }
533576}
0 commit comments