diff --git a/robotframework_pykafka/kafka_helper.py b/robotframework_pykafka/kafka_helper.py index ccaa494..a31b198 100644 --- a/robotframework_pykafka/kafka_helper.py +++ b/robotframework_pykafka/kafka_helper.py @@ -5,7 +5,7 @@ import time from pykafka import KafkaClient from pykafka.common import OffsetType -from utils import * +from robotframework_pykafka.utils import * ################################################## # Kafka helper class. @@ -17,7 +17,7 @@ class kafka_helper: # 1. constructor parameters if they are non-None and non-empty # 2. environment variables KAFKA_HOST and KAFKA_BROKER_VERSION # 3. default values (localhost and 2.3) - def __init__(self, kafkaBrokerHostname = None, kafkaBrokerVersion = None): + def __init__(self, kafkaBrokerHostname=None, kafkaBrokerVersion=None, cafile=None): # Determine the kafka host self._kafkaHost = "" @@ -45,8 +45,22 @@ def __init__(self, kafkaBrokerHostname = None, kafkaBrokerVersion = None): except Exception as e: raise e + self._cafile = "" + if cafile: + self._cafile = cafile + else: + try: + self._cafile = os.environ['CAFILE'] + except KeyError: + pass + # Get a kafka client - self._client = KafkaClient(hosts = self._kafkaHost, broker_version = self._kafkaBrokerVersion) + if self._cafile: + config = SslConfig(cafile=self._cafile) + self._client = KafkaClient(hosts=self._kafkaHost, broker_version=self._kafkaBrokerVersion, + ssl_config=config) + else: + self._client = KafkaClient(hosts=self._kafkaHost, broker_version=self._kafkaBrokerVersion) self._producers = dict() diff --git a/robotframework_pykafka/robotframework_pykafka.py b/robotframework_pykafka/robotframework_pykafka.py index 6c8218e..c6f4f90 100644 --- a/robotframework_pykafka/robotframework_pykafka.py +++ b/robotframework_pykafka/robotframework_pykafka.py @@ -1,6 +1,6 @@ #!/bin/python3 -from kafka_helper import * +from robotframework_pykafka.kafka_helper import * from robot.api import logger from robot.api.deco import keyword