1111use Symfony \Component \Console \Input \InputOption ;
1212use Symfony \Component \Console \Output \OutputInterface ;
1313use Magento \Framework \MessageQueue \ConsumerFactory ;
14- use Magento \MessageQueue \ Model \ Cron \ ConsumersRunner \ PidConsumerManager ;
14+ use Magento \Framework \ Lock \ LockManagerInterface ;
1515
1616/**
1717 * Command for starting MessageQueue consumers.
@@ -22,6 +22,7 @@ class StartConsumerCommand extends Command
2222 const OPTION_NUMBER_OF_MESSAGES = 'max-messages ' ;
2323 const OPTION_BATCH_SIZE = 'batch-size ' ;
2424 const OPTION_AREACODE = 'area-code ' ;
25+ const OPTION_SINGLE_THREAD = 'single-thread ' ;
2526 const PID_FILE_PATH = 'pid-file-path ' ;
2627 const COMMAND_QUEUE_CONSUMERS_START = 'queue:consumers:start ' ;
2728
@@ -36,9 +37,9 @@ class StartConsumerCommand extends Command
3637 private $ appState ;
3738
3839 /**
39- * @var PidConsumerManager
40+ * @var LockManagerInterface
4041 */
41- private $ pidConsumerManager ;
42+ private $ lockManager ;
4243
4344 /**
4445 * StartConsumerCommand constructor.
@@ -47,54 +48,60 @@ class StartConsumerCommand extends Command
4748 * @param \Magento\Framework\App\State $appState
4849 * @param ConsumerFactory $consumerFactory
4950 * @param string $name
50- * @param PidConsumerManager $pidConsumerManager
51+ * @param LockManagerInterface $lockManager
5152 */
5253 public function __construct (
5354 \Magento \Framework \App \State $ appState ,
5455 ConsumerFactory $ consumerFactory ,
5556 $ name = null ,
56- PidConsumerManager $ pidConsumerManager = null
57+ LockManagerInterface $ lockManager = null
5758 ) {
5859 $ this ->appState = $ appState ;
5960 $ this ->consumerFactory = $ consumerFactory ;
60- $ this ->pidConsumerManager = $ pidConsumerManager ?: \Magento \Framework \App \ObjectManager::getInstance ()
61- ->get (PidConsumerManager ::class);
61+ $ this ->lockManager = $ lockManager ?: \Magento \Framework \App \ObjectManager::getInstance ()
62+ ->get (LockManagerInterface ::class);
6263 parent ::__construct ($ name );
6364 }
6465
6566 /**
66- * { @inheritdoc}
67+ * @inheritdoc
6768 */
6869 protected function execute (InputInterface $ input , OutputInterface $ output )
6970 {
7071 $ consumerName = $ input ->getArgument (self ::ARGUMENT_CONSUMER );
7172 $ numberOfMessages = $ input ->getOption (self ::OPTION_NUMBER_OF_MESSAGES );
7273 $ batchSize = (int )$ input ->getOption (self ::OPTION_BATCH_SIZE );
7374 $ areaCode = $ input ->getOption (self ::OPTION_AREACODE );
74- $ pidFilePath = $ input ->getOption (self ::PID_FILE_PATH );
7575
76- if ($ pidFilePath && $ this ->pidConsumerManager ->isRun ($ pidFilePath )) {
77- $ output ->writeln ('<error>Consumer with the same PID is running</error> ' );
78- return \Magento \Framework \Console \Cli::RETURN_FAILURE ;
76+ if ($ input ->getOption (self ::PID_FILE_PATH )) {
77+ $ input ->setOption (self ::OPTION_SINGLE_THREAD , true );
7978 }
8079
81- if ($ pidFilePath ) {
82- $ this ->pidConsumerManager ->savePid ($ pidFilePath );
80+ $ singleThread = $ input ->getOption (self ::OPTION_SINGLE_THREAD );
81+
82+ if ($ singleThread && $ this ->lockManager ->isLocked (md5 ($ consumerName ))) { //phpcs:ignore
83+ $ output ->writeln ('<error>Consumer with the same name is running</error> ' );
84+ return \Magento \Framework \Console \Cli::RETURN_FAILURE ;
8385 }
8486
85- if ($ areaCode !== null ) {
86- $ this ->appState ->setAreaCode ($ areaCode );
87- } else {
88- $ this ->appState ->setAreaCode ('global ' );
87+ if ($ singleThread ) {
88+ $ this ->lockManager ->lock (md5 ($ consumerName )); //phpcs:ignore
8989 }
9090
91+ $ this ->appState ->setAreaCode ($ areaCode ?? 'global ' );
92+
9193 $ consumer = $ this ->consumerFactory ->get ($ consumerName , $ batchSize );
9294 $ consumer ->process ($ numberOfMessages );
95+
96+ if ($ singleThread ) {
97+ $ this ->lockManager ->unlock (md5 ($ consumerName )); //phpcs:ignore
98+ }
99+
93100 return \Magento \Framework \Console \Cli::RETURN_SUCCESS ;
94101 }
95102
96103 /**
97- * { @inheritdoc}
104+ * @inheritdoc
98105 */
99106 protected function configure ()
100107 {
@@ -125,11 +132,17 @@ protected function configure()
125132 'The preferred area (global, adminhtml, etc...) '
126133 . 'default is global. '
127134 );
135+ $ this ->addOption (
136+ self ::OPTION_SINGLE_THREAD ,
137+ null ,
138+ InputOption::VALUE_NONE ,
139+ 'This option prevents running multiple copies of one consumer simultaneously. '
140+ );
128141 $ this ->addOption (
129142 self ::PID_FILE_PATH ,
130143 null ,
131144 InputOption::VALUE_REQUIRED ,
132- 'The file path for saving PID '
145+ 'The file path for saving PID (This option is deprecated, use --single-thread instead) '
133146 );
134147 $ this ->setHelp (
135148 <<<HELP
@@ -150,8 +163,12 @@ protected function configure()
150163To specify the preferred area:
151164
152165 <comment>%command.full_name% someConsumer --area-code='adminhtml'</comment>
166+
167+ To do not run multiple copies of one consumer simultaneously:
168+
169+ <comment>%command.full_name% someConsumer --single-thread'</comment>
153170
154- To save PID enter path:
171+ To save PID enter path (This option is deprecated, use --single-thread instead) :
155172
156173 <comment>%command.full_name% someConsumer --pid-file-path='/var/someConsumer.pid'</comment>
157174HELP
0 commit comments