From 3aa0b9cff22450d7b9464a667747557f60934ea8 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Wed, 17 Jul 2019 17:30:40 +0100 Subject: [PATCH 1/7] conditional for setting SASL params --- src/PubSubConnectionFactory.php | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/PubSubConnectionFactory.php b/src/PubSubConnectionFactory.php index e0e97fd..f0b2edf 100644 --- a/src/PubSubConnectionFactory.php +++ b/src/PubSubConnectionFactory.php @@ -95,6 +95,16 @@ protected function makeKafkaAdapter(array $config) $conf->set('metadata.broker.list', $config['brokers']); $conf->set('enable.auto.commit', 'false'); $conf->set('offset.store.method', 'broker'); + +// $conf->set('request.timeout.ms', '20000'); +// $conf->set('retry.backoff.ms', '500'); + if (array_key_exists('sasl_username', $config)) { + $conf->set('sasl.username', array_get($config, 'sasl_username')); + $conf->set('sasl.password', array_get($config, 'sasl_password')); + $conf->set('sasl.mechanisms', array_get($config, 'sasl_mechanisms','PLAIN')); + $conf->set('security.protocol', array_get($config, 'sasl_protocol','SASL_SSL')); + } + $conf->setDefaultTopicConf($topicConf); $consumer = $this->container->makeWith('pubsub.kafka.consumer', ['conf' => $conf]); From dae857beb50feeb4514dd8c5d363b2c057dc8149 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Thu, 18 Jul 2019 16:37:13 +0100 Subject: [PATCH 2/7] detect security options by checking for 'security_protocol' in config --- src/PubSubConnectionFactory.php | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/PubSubConnectionFactory.php b/src/PubSubConnectionFactory.php index f0b2edf..574b169 100644 --- a/src/PubSubConnectionFactory.php +++ b/src/PubSubConnectionFactory.php @@ -96,13 +96,20 @@ protected function makeKafkaAdapter(array $config) $conf->set('enable.auto.commit', 'false'); $conf->set('offset.store.method', 'broker'); -// $conf->set('request.timeout.ms', '20000'); -// $conf->set('retry.backoff.ms', '500'); - if (array_key_exists('sasl_username', $config)) { - $conf->set('sasl.username', array_get($config, 'sasl_username')); - $conf->set('sasl.password', array_get($config, 'sasl_password')); - $conf->set('sasl.mechanisms', array_get($config, 'sasl_mechanisms','PLAIN')); - $conf->set('security.protocol', array_get($config, 'sasl_protocol','SASL_SSL')); + // set security options if required + if (array_key_exists('security_protocol', $config)) { + switch ($config['security_protocol']) { + case 'SASL_SSL': + case 'SASL_PAINTEXT': + $conf->set('security.protocol', array_get($config, 'security_protocol','SASL_SSL')); + $conf->set('sasl.username', array_get($config, 'sasl_username')); + $conf->set('sasl.password', array_get($config, 'sasl_password')); + $conf->set('sasl.mechanisms', array_get($config, 'sasl_mechanisms','PLAIN')); + break; + + default: + break; + } } $conf->setDefaultTopicConf($topicConf); From fa710015122a50cb613bdd012b5566044da47398 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Thu, 18 Jul 2019 17:31:54 +0100 Subject: [PATCH 3/7] include config when creating producer --- src/PubSubConnectionFactory.php | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/PubSubConnectionFactory.php b/src/PubSubConnectionFactory.php index 574b169..111c3ce 100644 --- a/src/PubSubConnectionFactory.php +++ b/src/PubSubConnectionFactory.php @@ -82,31 +82,27 @@ protected function makeRedisAdapter(array $config) */ protected function makeKafkaAdapter(array $config) { - // create producer - $producer = $this->container->makeWith('pubsub.kafka.producer'); - $producer->addBrokers($config['brokers']); - - // create consumer + // create default topic $topicConf = $this->container->makeWith('pubsub.kafka.topic_conf'); $topicConf->set('auto.offset.reset', 'smallest'); + // create config $conf = $this->container->makeWith('pubsub.kafka.conf'); $conf->set('group.id', array_get($config, 'consumer_group_id', 'php-pubsub')); $conf->set('metadata.broker.list', $config['brokers']); $conf->set('enable.auto.commit', 'false'); $conf->set('offset.store.method', 'broker'); - // set security options if required if (array_key_exists('security_protocol', $config)) { switch ($config['security_protocol']) { case 'SASL_SSL': case 'SASL_PAINTEXT': - $conf->set('security.protocol', array_get($config, 'security_protocol','SASL_SSL')); - $conf->set('sasl.username', array_get($config, 'sasl_username')); - $conf->set('sasl.password', array_get($config, 'sasl_password')); - $conf->set('sasl.mechanisms', array_get($config, 'sasl_mechanisms','PLAIN')); + $conf->set('security.protocol', array_get($config, 'security_protocol', 'SASL_SSL')); + $conf->set('sasl.username', array_get($config, 'sasl_username', 'sasl_username')); + $conf->set('sasl.password', array_get($config, 'sasl_password', 'sasl_password')); + $conf->set('sasl.mechanisms', array_get($config, 'sasl_mechanisms', 'PLAIN')); break; - + default: break; } @@ -114,6 +110,11 @@ protected function makeKafkaAdapter(array $config) $conf->setDefaultTopicConf($topicConf); + // create producer + $producer = $this->container->makeWith('pubsub.kafka.producer', ['conf' => $conf]); + $producer->addBrokers($config['brokers']); + + // create consumer $consumer = $this->container->makeWith('pubsub.kafka.consumer', ['conf' => $conf]); return new KafkaPubSubAdapter($producer, $consumer); From 3d0927c3eee86743bf59fe505bdb6a0e9d65caae Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Thu, 18 Jul 2019 17:37:34 +0100 Subject: [PATCH 4/7] use config parameters when creating producer --- src/PubSubServiceProvider.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/PubSubServiceProvider.php b/src/PubSubServiceProvider.php index 2058b9b..cbfe163 100644 --- a/src/PubSubServiceProvider.php +++ b/src/PubSubServiceProvider.php @@ -64,8 +64,8 @@ protected function registerAdapterDependencies() return new \RdKafka\TopicConf(); }); - $this->app->bind('pubsub.kafka.producer', function () { - return new \RdKafka\Producer(); + $this->app->bind('pubsub.kafka.producer', function ($app, $parameters) { + return new \RdKafka\Producer($parameters['conf']); }); $this->app->bind('pubsub.kafka.conf', function () { From 25d8bd0bf5905a613b2733ac2c909d23b35b5d2b Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Fri, 19 Jul 2019 14:51:38 +0100 Subject: [PATCH 5/7] update travis build to replace php-nightly with php 7.2 & 7.3 --- .travis.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index b12489d..5b39009 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,8 @@ php: - 5.6 - 7.0 - 7.1 - - nightly + - 7.2 + 7.3 before_script: - composer install From bb346999aa05f9985aa889c20c35b5d489b6087a Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Fri, 19 Jul 2019 14:52:43 +0100 Subject: [PATCH 6/7] travis file syntax fix, add php 7.3 --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 5b39009..31f635a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ php: - 7.0 - 7.1 - 7.2 - 7.3 + - 7.3 before_script: - composer install From 6651c8e935c3b8ffe59b192f6a6b69c74ad6dc46 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Mon, 22 Jul 2019 11:13:09 +0100 Subject: [PATCH 7/7] additional php version tests --- .travis.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.travis.yml b/.travis.yml index 31f635a..17c1e75 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,6 +6,13 @@ php: - 7.1 - 7.2 - 7.3 + - hhvm + - nightly + +matrix: + allow_failures: + - php: hhvm + - php: nightly before_script: - composer install