diff --git a/.pylintrc b/.pylintrc index 20452f2..fcee451 100644 --- a/.pylintrc +++ b/.pylintrc @@ -390,8 +390,8 @@ preferred-modules= [EXCEPTIONS] # Exceptions that will emit a warning when caught. -overgeneral-exceptions=BaseException, - Exception +overgeneral-exceptions=builtins.BaseException, + builtins.Exception [REFACTORING] @@ -453,7 +453,7 @@ max-locals=15 max-parents=7 # Maximum number of public methods for a class (see R0904). -max-public-methods=20 +max-public-methods=30 # Maximum number of return / yield for function / method body. max-returns=6 diff --git a/README.md b/README.md index d4d8dc7..2780f0d 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,8 @@ ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/zephyr-python-api) ![PyPI](https://img.shields.io/pypi/v/zephyr-python-api) ![PyPI - License](https://img.shields.io/pypi/l/zephyr-python-api) -### Project description -This is a set of wrappers for Zephyr Scale (TM4J) REST API. This means you can interact with Zephyr Scale without GUI, access it with python code and create automation scripts for your every day interactions. +## Project description +This is a set of wrappers for both Zephyr Scale and Zephyr Squad (TM4J) REST APIs. This means you can interact with Zephyr without GUI, access it with python code and create automation scripts for your every day interactions. To be done: * More usage examples @@ -13,22 +13,24 @@ To be done: * Convenient docs * Implementing higher level wrappers representing Test Case, Test Cycle, etc. -### Installation +## Installation ``` pip install zephyr-python-api ``` -### Example usage +## Example usage -Zephyr Cloud auth: +### Zephyr Scale + +Zephyr Scale Cloud auth: ```python from zephyr import ZephyrScale zscale = ZephyrScale(token=) ``` -Zephyr Server (TM4J) auth: +Zephyr Scale Server (TM4J) auth: ```python from zephyr import ZephyrScale @@ -58,17 +60,62 @@ test_case = zapi.test_cases.get_test_case("") creation_result = zapi.test_cases.create_test_case("", "test_case_name") ``` -### Troubleshooting +### Zephyr Squad + +Zephyr Squad Server (TM4J) auth: +```python +from zephyr import ZephyrSquad + +# Auth can be made with Jira token +auth = {"token": ""} + +# or with login and password (suggest using get_pass) +auth = {"username": "", "password": ""} + +# or even session cookie dict +auth = {"cookies": ""} + +zsquad = ZephyrSquad(base_url=base_url, **auth) +``` + +Then it is possible to interact with api wrappers: +```python +# Obtain a project's information +project_info = zsquad.actions.project.get_project_info("") + +# Obtain a project's versions/releases +project_versions = zsquad.api.util_resource.get_all_versions("") + +# Get a single test case by its id +test_case = zsquad.actions.test_cases.get_test_case("", fields="id") + +# Create a new test case for a project +data = { + "fields": { + "assignee": { + "name": "" + }, + "description": "" + } +} +creation_result = zsquad.actions.test_cases.create_test_case(projectId="", summary="", data=data) +``` + +## Troubleshooting For troubleshooting see [TROUBLESHOOTING.md](TROUBLESHOOTING.md) -### License +## License This library is licensed under the Apache 2.0 License. -### Links +## Links [Zephyr Scale Cloud API docs](https://support.smartbear.com/zephyr-scale-cloud/api-docs/) -[Zephyr Scale Server API docs](https://support.smartbear.com/zephyr-scale-server/api-docs/v1/) \ No newline at end of file +[Zephyr Scale Server API docs](https://support.smartbear.com/zephyr-scale-server/api-docs/v1/) + +[Zephyr Squad Server API docs](https://zephyrsquadserver.docs.apiary.io/) + +[Zephyr Squad Server How to API docs](https://support.smartbear.com/zephyr-squad-server/docs/api/index.html) diff --git a/examples/server.py b/examples/scale-server.py similarity index 100% rename from examples/server.py rename to examples/scale-server.py diff --git a/examples/squad-server.py b/examples/squad-server.py new file mode 100644 index 0000000..afecb7f --- /dev/null +++ b/examples/squad-server.py @@ -0,0 +1,133 @@ +""" +Usage examples of Zephyr Squad Server API wrappers. +""" +import logging + +from zephyr import ZephyrSquad + +# Enable logging with level Debug for more verbosity +logging.basicConfig(level=logging.DEBUG) + + +# Specify your Jira context to operate with: +base_url = "https://jira.hosted.com/" + +# Use the Jira certificate for TLS connections +session_params = { + "verify": "" +} + +# Create an instance of Zephyr Squad +zsquad = ZephyrSquad( + base_url=base_url, + token="", + session_attrs=session_params +) + +# Now we can start playing with the Zephyr API! + +# Obtain a project's information +project_info = zsquad.actions.project.get_project_info("") + +# Obtain a project's versions/releases +project_versions = zsquad.api.util_resource.get_all_versions("") + +# Get the data of a testcase +test_case = zsquad.actions.test_cases.get_test_case("", fields="id") + +# Get the test steps from a testcase +test_steps = zsquad.api.teststep_resource.get_list_of_teststeps("") + +# Get the information about a test cycle +test_cycle = zsquad.api.cycle_resource.get_cycle_information(cycle_id="") + +# Get the list of all test cycles for a specific release +test_cycles = zsquad.api.cycle_resource.get_list_of_cycle(project_id="", versionId="") + +# Get all folders from a test cycle +test_cycle_folders = zsquad.api.cycle_resource.get_the_list_of_folder_for_a_cycle(cycle_id="", project_id="", version_id="") + +# Get all test executions from a test case +test_executions = zsquad.api.traceability_resource.get_list_of_search_execution_by_test(test_id_or_key="") + +# Create a new test case for a project +data = { + "fields": { + "assignee": { + "name": "" + }, + "description": "" + } +} +ret_data = zsquad.actions.test_cases.create_test_case(project_id="", summary="", data=data) + +# Execute ZQL search query +demo_query = "project = '' AND cycleName = ''" +zql_search_res = zsquad.api.execution_search_resource.execute_search_to_get_search_result(query=demo_query, maxRecords=200) + +# Create a new test cycle for a project based on an existing test case +data = { + "clonedCycleId": "", + "description": "", + "build": "", + "startDate": "29/Nov/22", + "endDate": "4/Dec/22", + "environment": "" +} +ret_data = zsquad.api.cycle_resource.create_new_cycle(project_id="", version_id="", name="", data=data) + +# Create a new test folder for a test cycle +data = { + "cycleId": 1508, # it will be rewritten by the function + "name": "", + "description": "", + "projectId": 10600, # it will be rewritten by the function + "versionId": -1, # it will be rewritten by the function + "clonedFolderId": -1 +} +ret_data = zsquad.api.folder_resource.create_folder_under_cycle(project_id="", version_id="", cycle_id="", data=data) + +# Add a new test case for a test cycle +data = { + "issues":[""], +} +ret_data = zsquad.api.execution_resource.add_test_to_cycle(project_id="", cycle_id="", method="1", data=data) + +# Obtain the execution details +exec_details = zsquad.api.execution_resource.get_execution_information(execution_id="") + +# Obtain the execution steps from an execution +exec_steps = zsquad.api.step_result_resource.get_list_of_step_result(execution_id="") + +# Update the status of an execution step +step_status = zsquad.api.step_result_resource.update_step_result_information(step_result_id="", status=2) + +# Update the execution status +exec_status = zsquad.api.execution_resource.update_execution_details(execution_id="", status=2) + +# Update a folder name and description +data = { + "description": "" +} +ret_data = zsquad.api.folder_resource.update_folder_information(project_id="", version_id="", cycle_id="", folder_id="", name="") + +# Delete 3 test executions +delete_status = zsquad.api.execution_resource.delete_bulk_execution(execution_id=["", "", ""]) + +# Show the progress of a job +job_status = zsquad.api.execution_resource.get_job_progress_status(job_progress_token="") + +# Get a test step's detailed information +test_step = zsquad.api.teststep_resource.get_teststep_information(test_step_id="", issue_id="") + +# Add a attachment (for a execution result: entityId=executionId and entityType='Execution') +attach = zsquad.api.attachment_resource.add_attachment_into_entity(file_path="", entity_id="", entity_type='') + +# Add a assignee to a execution result +add_assignee = zsquad.api.execution_resource.add_assignee_to_execution(execution_id="", assignee="") + +# Create a test execution result in a test cycle +data = { + "folderId": "" +} +ret_data = zsquad.api.execution_resource.create_new_execution(project_id="", cycle_id="", version_id="", issue_id="", data=data) diff --git a/setup.cfg b/setup.cfg index 842ad07..da3b417 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,4 +27,4 @@ install_requires = [options.packages.find] exclude = - tests* \ No newline at end of file + tests* diff --git a/tests/unit/test_scale.py b/tests/unit/test_scale.py index dcae06c..46a7e4c 100644 --- a/tests/unit/test_scale.py +++ b/tests/unit/test_scale.py @@ -5,7 +5,7 @@ from zephyr.scale.scale import DEFAULT_BASE_URL, ZephyrScale -ZSESSION_PATH = "zephyr.scale.scale.ZephyrSession" +ZSESSION_PATH = "zephyr.scale.scale.ZephyrScaleSession" CLOUD_API_WRAP_PATH = "zephyr.scale.scale.CloudApiWrapper" SERVER_API_WRAP_PATH = "zephyr.scale.scale.ServerApiWrapper" diff --git a/tests/unit/test_zephyr_session.py b/tests/unit/test_zephyr_session.py index 0d2433c..9414746 100644 --- a/tests/unit/test_zephyr_session.py +++ b/tests/unit/test_zephyr_session.py @@ -1,8 +1,8 @@ import pytest from requests import Session -from zephyr.scale.scale import DEFAULT_BASE_URL, ZephyrSession -from zephyr.scale.zephyr_session import INIT_SESSION_MSG, InvalidAuthData +from zephyr.scale.scale import DEFAULT_BASE_URL, ZephyrScaleSession +from zephyr.common.zephyr_session import INIT_SESSION_MSG, InvalidAuthData REQUESTS_SESSION_PATH = "requests.sessions.Session" GETLOGGER_PATH = "logging.getLogger" @@ -10,24 +10,24 @@ @pytest.mark.unit -class TestZephyrSession: +class TestZephyrScaleSession: def test_creation(self, mocker): """Tests basic creation logic""" logger_mock = mocker.patch(GETLOGGER_PATH) - zsession = ZephyrSession(DEFAULT_BASE_URL, token="token_test") + zsession = ZephyrScaleSession(DEFAULT_BASE_URL, token="token_test") assert zsession.base_url == DEFAULT_BASE_URL, (f"Attribute base_url expected to be {DEFAULT_BASE_URL}, " f"not {zsession.base_url}") assert isinstance(zsession._session, Session) - logger_mock.assert_called_with("zephyr.scale.zephyr_session") + logger_mock.assert_called_with("zephyr.common.zephyr_session") def test_token_auth(self, mocker): """Test token auth""" token = "test_token" logger_mock = mocker.patch(LOGGER_DEBUG_PATH) - zsession = ZephyrSession(DEFAULT_BASE_URL, token=token) + zsession = ZephyrScaleSession(DEFAULT_BASE_URL, token=token) logger_mock.assert_called_with(INIT_SESSION_MSG.format("token")) assert f"Bearer {token}" == zsession._session.headers.get("Authorization") @@ -38,7 +38,7 @@ def test_credentials_auth(self, mocker): password = "pwdtest" logger_mock = mocker.patch(LOGGER_DEBUG_PATH) - zsession = ZephyrSession(DEFAULT_BASE_URL, username=username, password=password) + zsession = ZephyrScaleSession(DEFAULT_BASE_URL, username=username, password=password) logger_mock.assert_called_with(INIT_SESSION_MSG.format("username and password")) assert (username, password) == zsession._session.auth @@ -48,7 +48,7 @@ def test_cookie_auth(self, mocker): test_cookie = {"cookies": {"cookie.token": "cookie_test"}} logger_mock = mocker.patch(LOGGER_DEBUG_PATH) - zsession = ZephyrSession(DEFAULT_BASE_URL, cookies=test_cookie) + zsession = ZephyrScaleSession(DEFAULT_BASE_URL, cookies=test_cookie) logger_mock.assert_called_with(INIT_SESSION_MSG.format("cookies")) assert test_cookie['cookies'] in zsession._session.cookies.values() @@ -59,7 +59,7 @@ def test_cookie_auth(self, mocker): def test_auth_exception(self, auth_data, exception): """Test exceptions on auth""" with pytest.raises(exception): - ZephyrSession(DEFAULT_BASE_URL, **auth_data) + ZephyrScaleSession(DEFAULT_BASE_URL, **auth_data) @pytest.mark.parametrize("creation_kwargs", [{"token": "token_test", @@ -69,7 +69,7 @@ def test_requests_session_attrs(self, creation_kwargs, mocker): logger_mock = mocker.patch(LOGGER_DEBUG_PATH) session_attrs = creation_kwargs.get('session_attrs') - zsession = ZephyrSession(DEFAULT_BASE_URL, **creation_kwargs) + zsession = ZephyrScaleSession(DEFAULT_BASE_URL, **creation_kwargs) logger_mock.assert_called_with( f"Modify requests session object with {session_attrs}") diff --git a/zephyr/__init__.py b/zephyr/__init__.py index 3232bda..ecb16f6 100644 --- a/zephyr/__init__.py +++ b/zephyr/__init__.py @@ -1,2 +1,3 @@ from zephyr.scale import API_V1, API_V2, ZephyrScale +from zephyr.squad import ZephyrSquad from zephyr.utils.common import cookie_str_to_dict diff --git a/zephyr/scale/zephyr_session.py b/zephyr/common/zephyr_session.py similarity index 72% rename from zephyr/scale/zephyr_session.py rename to zephyr/common/zephyr_session.py index ab233f1..4783419 100644 --- a/zephyr/scale/zephyr_session.py +++ b/zephyr/common/zephyr_session.py @@ -1,5 +1,4 @@ import logging -from urllib.parse import urlparse, parse_qs from requests import HTTPError, Session @@ -13,7 +12,7 @@ class InvalidAuthData(Exception): class ZephyrSession: """ - Zephyr Scale basic session object. + Zephyr basic session object. :param base_url: url to make requests to :param token: auth token @@ -82,34 +81,3 @@ def put(self, endpoint: str, json: dict = None, **kwargs): def delete(self, endpoint: str, **kwargs): """Delete request wrapper""" return self._request("delete", endpoint, **kwargs) - - def get_paginated(self, endpoint, params=None): - """Get paginated data""" - self.logger.debug(f"Get paginated data from endpoint={endpoint} and params={params}") - if params is None: - params = {} - - while True: - response = self.get(endpoint, params=params) - if "values" not in response: - return - for value in response.get("values", []): - yield value - if response.get("isLast") is True: - break - params_str = urlparse(response.get("next")).query - params.update(parse_qs(params_str)) - return - - def post_file(self, endpoint: str, file_path: str, to_files=None, **kwargs): - """ - Post wrapper to send a file. Handles single file opening, - sending its content and closing - """ - with open(file_path, "rb") as file: - files = {"file": file} - - if to_files: - files.update(to_files) - - return self._request("post", endpoint, files=files, **kwargs) diff --git a/zephyr/scale/cloud/cloud_api.py b/zephyr/scale/cloud/cloud_api.py index e39977c..2ab71f7 100644 --- a/zephyr/scale/cloud/cloud_api.py +++ b/zephyr/scale/cloud/cloud_api.py @@ -1,6 +1,5 @@ import logging - -from zephyr.scale.zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession from zephyr.scale.cloud.endpoints import (AutomationEndpoints, EnvironmentEndpoints, FolderEndpoints, @@ -18,7 +17,7 @@ # pylint: disable=missing-function-docstring class CloudApiWrapper: """Zephyr Scale Cloud Api wrapper. Contains wrappers by sections.""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session self.logger = logging.getLogger(__name__) diff --git a/zephyr/scale/cloud/endpoints/automations.py b/zephyr/scale/cloud/endpoints/automations.py index 093a985..ee6c977 100644 --- a/zephyr/scale/cloud/endpoints/automations.py +++ b/zephyr/scale/cloud/endpoints/automations.py @@ -1,12 +1,12 @@ from json import dumps -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class AutomationEndpoints: """Api wrapper for "Automation" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def _post_reports(self, diff --git a/zephyr/scale/cloud/endpoints/environments.py b/zephyr/scale/cloud/endpoints/environments.py index 8dfa029..ad69ac2 100644 --- a/zephyr/scale/cloud/endpoints/environments.py +++ b/zephyr/scale/cloud/endpoints/environments.py @@ -1,10 +1,10 @@ -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class EnvironmentEndpoints: """Api wrapper for "Environment" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def get_environments(self, **kwargs): diff --git a/zephyr/scale/cloud/endpoints/folders.py b/zephyr/scale/cloud/endpoints/folders.py index 8b7c332..d3dbd79 100644 --- a/zephyr/scale/cloud/endpoints/folders.py +++ b/zephyr/scale/cloud/endpoints/folders.py @@ -1,10 +1,10 @@ -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class FolderEndpoints: """Api wrapper for "Folder" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def get_folders(self, **kwargs): diff --git a/zephyr/scale/cloud/endpoints/healthcheck.py b/zephyr/scale/cloud/endpoints/healthcheck.py index f6ab761..777418e 100644 --- a/zephyr/scale/cloud/endpoints/healthcheck.py +++ b/zephyr/scale/cloud/endpoints/healthcheck.py @@ -1,10 +1,10 @@ -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class HealthcheckEndpoints: """Api wrapper for "Healthcheck" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def get_health(self): diff --git a/zephyr/scale/cloud/endpoints/links.py b/zephyr/scale/cloud/endpoints/links.py index 1775b91..8eee619 100644 --- a/zephyr/scale/cloud/endpoints/links.py +++ b/zephyr/scale/cloud/endpoints/links.py @@ -1,10 +1,10 @@ -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class LinkEndpoints: """Api wrapper for "Link" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def delete_link(self, link_id): diff --git a/zephyr/scale/cloud/endpoints/priorities.py b/zephyr/scale/cloud/endpoints/priorities.py index c73c86d..0f7f93d 100644 --- a/zephyr/scale/cloud/endpoints/priorities.py +++ b/zephyr/scale/cloud/endpoints/priorities.py @@ -1,10 +1,10 @@ -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class PriorityEndpoints: """Api wrapper for "Priority" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def get_priorities(self, **kwargs): diff --git a/zephyr/scale/cloud/endpoints/projects.py b/zephyr/scale/cloud/endpoints/projects.py index 5edc8a6..5f41e27 100644 --- a/zephyr/scale/cloud/endpoints/projects.py +++ b/zephyr/scale/cloud/endpoints/projects.py @@ -1,10 +1,10 @@ -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class ProjectEndpoints: """Api wrapper for "Project" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def get_projects(self): diff --git a/zephyr/scale/cloud/endpoints/statuses.py b/zephyr/scale/cloud/endpoints/statuses.py index 1f977d1..c051a0b 100644 --- a/zephyr/scale/cloud/endpoints/statuses.py +++ b/zephyr/scale/cloud/endpoints/statuses.py @@ -1,10 +1,10 @@ -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class StatusEndpoints: """Api wrapper for "Status" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def get_statuses(self, **kwargs): diff --git a/zephyr/scale/cloud/endpoints/test_cases.py b/zephyr/scale/cloud/endpoints/test_cases.py index 390457d..8972689 100644 --- a/zephyr/scale/cloud/endpoints/test_cases.py +++ b/zephyr/scale/cloud/endpoints/test_cases.py @@ -1,9 +1,9 @@ -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class TestCaseEndpoints: """Api wrapper for "Test Case" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def get_test_cases(self, **kwargs): diff --git a/zephyr/scale/cloud/endpoints/test_cycles.py b/zephyr/scale/cloud/endpoints/test_cycles.py index 5776fee..85ba3f4 100644 --- a/zephyr/scale/cloud/endpoints/test_cycles.py +++ b/zephyr/scale/cloud/endpoints/test_cycles.py @@ -1,11 +1,11 @@ from typing import Union -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class TestCycleEndpoints: """Api wrapper for "Test Cycle" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def get_all_test_cycles(self, **kwargs): diff --git a/zephyr/scale/cloud/endpoints/test_executions.py b/zephyr/scale/cloud/endpoints/test_executions.py index 5ec63d0..6facdde 100644 --- a/zephyr/scale/cloud/endpoints/test_executions.py +++ b/zephyr/scale/cloud/endpoints/test_executions.py @@ -1,11 +1,11 @@ from typing import Union -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class TestExecutionEndpoints: """Api wrapper for "Test Execution" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def get_test_executions(self, **kwargs): diff --git a/zephyr/scale/cloud/endpoints/test_plans.py b/zephyr/scale/cloud/endpoints/test_plans.py index 4c964e4..6643017 100644 --- a/zephyr/scale/cloud/endpoints/test_plans.py +++ b/zephyr/scale/cloud/endpoints/test_plans.py @@ -1,9 +1,9 @@ -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession class TestPlanEndpoints: """Api wrapper for "Test Plan" endpoints""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session def get_test_plans(self, **kwargs): diff --git a/zephyr/scale/scale.py b/zephyr/scale/scale.py index 26a29ab..2fd1f8e 100644 --- a/zephyr/scale/scale.py +++ b/zephyr/scale/scale.py @@ -1,6 +1,6 @@ import logging -from zephyr.scale.zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession from zephyr.scale.cloud.cloud_api import CloudApiWrapper from zephyr.scale.server.server_api import ServerApiWrapper @@ -22,7 +22,7 @@ class ZephyrScale: """ def __init__(self, base_url=None, api_version=API_V2, **kwargs): base_url = DEFAULT_BASE_URL if not base_url else base_url - session = ZephyrSession(base_url=base_url, **kwargs) + session = ZephyrScaleSession(base_url=base_url, **kwargs) if api_version.lower() == API_V2: self.api = CloudApiWrapper(session) diff --git a/zephyr/scale/server/endpoints/endpoints.py b/zephyr/scale/server/endpoints/endpoints.py index 8b3f9cc..e8abcb8 100644 --- a/zephyr/scale/server/endpoints/endpoints.py +++ b/zephyr/scale/server/endpoints/endpoints.py @@ -1,10 +1,10 @@ -from ...zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession from .paths import ServerPaths as Paths class EndpointTemplate: """Class with basic constructor for endpoint classes""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session diff --git a/zephyr/scale/server/server_api.py b/zephyr/scale/server/server_api.py index c52b5c6..4de4edc 100644 --- a/zephyr/scale/server/server_api.py +++ b/zephyr/scale/server/server_api.py @@ -1,13 +1,13 @@ import logging -from zephyr.scale.zephyr_session import ZephyrSession +from zephyr.scale.zephyr_scale_session import ZephyrScaleSession from zephyr.scale.server import endpoints # pylint: disable=missing-function-docstring class ServerApiWrapper: """Zephyr Scale Server Api wrapper""" - def __init__(self, session: ZephyrSession): + def __init__(self, session: ZephyrScaleSession): self.session = session self.logger = logging.getLogger(__name__) diff --git a/zephyr/scale/zephyr_scale_session.py b/zephyr/scale/zephyr_scale_session.py new file mode 100644 index 0000000..7985a5f --- /dev/null +++ b/zephyr/scale/zephyr_scale_session.py @@ -0,0 +1,47 @@ +from urllib.parse import urlparse, parse_qs +from zephyr.common.zephyr_session import ZephyrSession + + +class ZephyrScaleSession(ZephyrSession): + """ + Zephyr Scale basic session object. + + :param base_url: url to make requests to + :param token: auth token + :param username: username + :param password: password + :param cookies: cookie dict + + :keyword session_attrs: a dict with session attrs to be set as keys and their values + """ + + def get_paginated(self, endpoint, params=None): + """Get paginated data""" + self.logger.debug(f"Get paginated data from endpoint={endpoint} and params={params}") + if params is None: + params = {} + + while True: + response = self.get(endpoint, params=params) + if "values" not in response: + return + for value in response.get("values", []): + yield value + if response.get("isLast") is True: + break + params_str = urlparse(response.get("next")).query + params.update(parse_qs(params_str)) + return + + def post_file(self, endpoint: str, file_path: str, to_files=None, **kwargs): + """ + Post wrapper to send a file. Handles single file opening, + sending its content and closing + """ + with open(file_path, "rb") as file: + files = {"file": file} + + if to_files: + files.update(to_files) + + return self._request("post", endpoint, files=files, **kwargs) diff --git a/zephyr/squad/__init__.py b/zephyr/squad/__init__.py new file mode 100644 index 0000000..551ad90 --- /dev/null +++ b/zephyr/squad/__init__.py @@ -0,0 +1 @@ +from zephyr.squad.squad import ZephyrSquad diff --git a/zephyr/squad/server/__init__.py b/zephyr/squad/server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/zephyr/squad/server/actions.py b/zephyr/squad/server/actions.py new file mode 100644 index 0000000..6a21d06 --- /dev/null +++ b/zephyr/squad/server/actions.py @@ -0,0 +1,20 @@ +import logging + +from zephyr.squad.zephyr_squad_session import ZephyrSquadSession +from zephyr.squad.server import endpoints + + +# pylint: disable=missing-function-docstring +class ServerActionsWrapper: + """Zephyr Squad Server Actions wrapper""" + def __init__(self, session: ZephyrSquadSession): + self.session = session + self.logger = logging.getLogger(__name__) + + @property + def project(self): + return endpoints.ProjectEndpoints(self.session) + + @property + def test_cases(self): + return endpoints.TestCaseEndpoints(self.session) diff --git a/zephyr/squad/server/endpoints/__init__.py b/zephyr/squad/server/endpoints/__init__.py new file mode 100644 index 0000000..b86bdbb --- /dev/null +++ b/zephyr/squad/server/endpoints/__init__.py @@ -0,0 +1,28 @@ +from .actions import( + ProjectEndpoints, + TestCaseEndpoints +) + +from .endpoints import ( + ChartResourceEndpoints, + ExecutionSearchResourceEndpoints, + ZQLFilterResourceEndpoints, + CycleResourceEndpoints, + ZNavResourceEndpoints, + LicenseResourceEndpoints, + PreferenceResourceEndpoints, + StepResultResourceEndpoints, + TraceabilityResourceEndpoints, + TestcaseResourceEndpoints, + UtilResourceEndpoints, + FolderResourceEndpoints, + ExecutionResourceEndpoints, + IssuePickerResourceEndpoints, + AuditResourceEndpoints, + TeststepResourceEndpoints, + AttachmentResourceEndpoints, + ZAPIResourceEndpoints, + ZQLAutoCompleteResourceEndpoints, + SystemInfoResourceEndpoints, + FilterPickerResourceEndpoints +) diff --git a/zephyr/squad/server/endpoints/actions.py b/zephyr/squad/server/endpoints/actions.py new file mode 100644 index 0000000..e9f39a4 --- /dev/null +++ b/zephyr/squad/server/endpoints/actions.py @@ -0,0 +1,64 @@ +from zephyr.squad.zephyr_squad_session import ZephyrSquadSession +from zephyr.utils.common import dict_merge +from .paths import ServerPaths as Paths +from .endpoints import UtilResourceEndpoints as UtilResource + + +class EndpointTemplate: + """Class with basic constructor for endpoint classes""" + def __init__(self, session: ZephyrSquadSession): + self.session = session + + +class ProjectEndpoints(EndpointTemplate): + """Api wrapper for "Project" endpoints""" + + def get_project_info(self, project_key): + """ + Retrieve project information + + :param project_key: Jira project key + :return: dict with response body + """ + return self.session.get(Paths.PRJ_ID.format(project_key)) + + def get_project_versions_by_key(self, project_key): + """ + Retrieve all project versions (releases) by project key using Jira API + + :param project_key: Jira project key + :return: list of dict for each version + """ + return self.session.get(Paths.PRJ_VERSIONS_BY_KEY.format(project_key)) + + +class TestCaseEndpoints(EndpointTemplate): + """Api wrapper for "Test Case" endpoints""" + + def get_test_case(self, test_case_key, **params): + """Retrieve the Test Case matching the given key""" + return self.session.get(Paths.CASE_KEY.format(test_case_key), + params=params) + + def create_test_case(self, project_id, summary, data): + """ + Creates a new Test Case based on a minimum required fields. + + (https://support.smartbear.com/zephyr-squad-server/docs/api/how-to/create-tests.html) + See Zephyr Squad Test Case creation documentation to better understand what can be + modified. + """ + case_type = UtilResource(self.session).get_zephyr_issue_type()["testcaseIssueTypeId"] + json = { + "fields": { + "project": { + "id": project_id + }, + 'issuetype': { + 'id': case_type + }, + "summary": summary, + } + } + merged_json = dict_merge(data, json) + return self.session.post(Paths.CASE, json=merged_json) diff --git a/zephyr/squad/server/endpoints/endpoints.py b/zephyr/squad/server/endpoints/endpoints.py new file mode 100644 index 0000000..f17224c --- /dev/null +++ b/zephyr/squad/server/endpoints/endpoints.py @@ -0,0 +1,874 @@ +from zephyr.squad.zephyr_squad_session import ZephyrSquadSession +from zephyr.utils.common import dict_merge +from .paths import ServerPaths as Paths + + +class EndpointTemplate: + """Class with basic constructor for endpoint classes""" + def __init__(self, session: ZephyrSquadSession): + self.session = session + + +class ChartResourceEndpoints(EndpointTemplate): + """Api wrapper for ChartResource""" + + def get_issue_status_by_project(self, project_id): + """Get Issue Statuses by Project Id""" + params = { + "projectId": project_id + } + return self.session.get(Paths.ZCHART_STATUS, params=params) + + def generate_test_created_data(self, project_key, **params): + """Generate Test's Created Data by Project Key""" + params.update(projectKey=project_key) + return self.session.get(Paths.ZCHART_TESTS, params=params) + + +class ExecutionSearchResourceEndpoints(EndpointTemplate): + """Api wrapper for ExecutionSearchResource""" + + def execute_search_to_get_search_result(self, query, **params): + """Execute Search to Get ZQL Search Result by zqlQuery""" + params.update(zqlQuery=f'({query})') + return self.session.get_paginated(Paths.ZQL_SEARCH, query_type="execution", params=params) + + def get_search_clauses(self): + """Get List of Search Clauses""" + return self.session.get(Paths.ZQL_CLAUSES) + + def get_autocomplete_zql_json(self): + """Get AutoComplete JSON Execution""" + return self.session.get(Paths.ZQL_AUTOCOMP) + + +class ZQLFilterResourceEndpoints(EndpointTemplate): + """ + Following section describes the rest resources (API's) pertaining to ExecutionFilterResource + """ + + def get_zql_filter(self, filter_id): + """Get ZQL filter by it's id""" + return self.session.get(Paths.ZQL_FILTER_ID.format(filter_id)) + + def delete_zql_filter(self, filter_id): + """Deletes a ZQL filter by id""" + return self.session.delete(Paths.ZQL_FILTER_ID.format(filter_id)) + + def get_all_execution_filters(self, **params): + """Get All Execution Filters""" + raise NotImplementedError + + def search_execution_filters(self, **params): + """Search Execution Filters by Filter Name""" + raise NotImplementedError + + def quick_search_zql_filters(self, query): + """Quick Search Execution Filters by Query""" + params = { + 'query': query + } + return self.session.get(Paths.ZQL_FILTER_QUICK_SEARCH, params=params) + + def copy_zql_filter(self, filter_id, filter_name): + """Copy Execution Filter by Filter Name""" + json = { + "id": filter_id, + "filterName": filter_name + } + return self.session.put(Paths.ZQL_FILTER_COPY, json=json) + + def create_execution_filter(self, **params): + """Create new execution filter""" + raise NotImplementedError + + def update_the_zql_filter(self, **params): + """Update Execution Filter""" + raise NotImplementedError + + def rename_zql_filter(self, filter_id, filter_name): + """Rename an Execution Filter""" + json = { + "id": filter_id, + "filterName": filter_name + } + return self.session.put(Paths.ZQL_FILTER_RENAME, json=json) + + def toggle_zql_filter_isfavorites(self, filter_id, is_favorite=True): + """Toggle ZQL filter 'isFavorites'""" + json = { + "id": filter_id, + "isFavorite": is_favorite + } + return self.session.put(Paths.ZQL_FILTER_FAV, json=json) + + def get_loggedin_user(self): + """Get LoggedIn User""" + return self.session.get(Paths.ZQL_FILTER_USER) + + +class CycleResourceEndpoints(EndpointTemplate): + """Following section describes rest resources (API's) pertaining to CycleResource""" + + def get_cycle_information(self, cycle_id): + """Get Cycle data by Cycle Id. If cycleId -1 is passed, system returns hardcoded cycle""" + return self.session.get(Paths.CYCLE_ID.format(cycle_id)) + + def export_cycle_data(self, **params): + """Export Cycle by Cycle Id, file is generated on fly and streamed to client""" + raise NotImplementedError + + def create_new_cycle(self, project_id, version_id, name, data=None): + """ + Create New Cycle by given Cycle Information + + See Zephyr Squad Cycle creation documentation to better understand what can be modified. + https://support.smartbear.com/zephyr-squad-server/docs/api/how-to/create-cycle.html + """ + if data is None: + data = {} + + json = { + 'name': name, + 'projectId': project_id, + 'versionId': version_id + } + merged_json = dict_merge(data, json) + return self.session.post(Paths.CYCLE, json=merged_json) + + def update_cycle_information(self, **params): + """Update Cycle Information""" + raise NotImplementedError + + def delete_cycle(self, cycle_id, is_folder_cycle_delete=""): + """ + Delete Cycle by Cycle Id + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=cycle_delete_job_progress. + Once the request is processed, the jobProgress will populate the message field with result. + """ + params = { + "isFolderCycleDelete": is_folder_cycle_delete, + } + return self.session.delete(Paths.CYCLE_ID.format(cycle_id), params=params) + + def get_list_of_cycle(self, project_id, **params): + """Get List of Cycle by Project Id""" + params.update(projectId=project_id) + return self.session.get(Paths.CYCLE, params=params) + + def move_executions_to_cycle(self, **params): + """ + Move Executions to Cycle by Cycle Id + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=bulk_execution_copy_move_job_progress. # pylint: disable=line-too-long + Once the request is processed, the jobProgress will populate the message field with result. + """ + raise NotImplementedError + + def get_cycles_by_versions_sprint(self, **params): + """Get Cycles by Version Id, Project Id""" + raise NotImplementedError + + def clean_up_sprint_from_cycle(self, **params): + """Cleanup sprint data from cycle""" + raise NotImplementedError + + def get_the_list_of_folder_for_a_cycle(self, project_id, version_id, cycle_id, **params): + """Get the list of folder for a cycle""" + params.update(projectId=project_id, versionId=version_id) + return self.session.get(Paths.CYCLE_FOLDERS.format(cycle_id), params=params) + + def move_selected_executions_or_all_executions_from_cycle_to_folder(self, **params): + """Move selected executions or all executions from cycle to folder""" + raise NotImplementedError + + def copy_executions_to_cycle(self, **params): + """ + Copy Executions to Cycle By Cycle Id + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=bulk_execution_copy_move_job_progress. # pylint: disable=line-too-long + Once the request is processed, the jobProgress will populate the message field with result. + """ + raise NotImplementedError + + +class ZNavResourceEndpoints(EndpointTemplate): + """Rest end point for Zephyr Navigation(znav)""" + + def get_available_columns(self, **params): + """Get Available Columns by Execution Filter Id""" + raise NotImplementedError + + def create_column_selection(self, **params): + """Create/Save Column Selection""" + raise NotImplementedError + + def update_column_selection(self, **params): + """Update Column Selection by Column Layout Id""" + raise NotImplementedError + + +class LicenseResourceEndpoints(EndpointTemplate): + """ + Following section describes the rest resources (API's) pertaining to Zephyr Squad + LicenseResource + """ + + def get_license_status_information(self): + """Get License Status Inforation""" + return self.session.get(Paths.LICENSE) + + +class PreferenceResourceEndpoints(EndpointTemplate): + """Following section describes the rest resources pertaining to PreferenceResource""" + + def set_test_step_customization_preference(self, **params): + """Set test step customization preference.""" + raise NotImplementedError + + def get_test_step_customization_preference(self): + """Get test step customization preference.""" + return self.session.get(Paths.STEP_PREF) + + def set_cycle_summary_columns_customization_preference(self, **params): + """Set cycle summary columns customization preference.""" + raise NotImplementedError + + def get_cycle_summary_customization_preference(self): + """Get cycle summary column customization preference.""" + return self.session.get(Paths.CYCLE_PREF) + + def set_execution_summary_columns_customization_preference(self, **params): + """Set execution summary columns customization preference.""" + raise NotImplementedError + + def get_execution_summary_customization_preference(self): + """Get execution summary column customization preference.""" + return self.session.get(Paths.EXEC_PREF) + + +class StepResultResourceEndpoints(EndpointTemplate): + """Following section describes the rest resources pertaining to StepResultResource""" + + def get_list_of_step_result(self, execution_id, expand=""): + """Get List of Step Result by Execution Id""" + params = { + "executionId": execution_id, + "expand": expand + } + return self.session.get(Paths.STEP_RES, params=params) + + def get_step_result_information(self, step_result_id, expand=""): + """Get Single Step Result Information by StepResult Id""" + params = { + "expand": expand + } + return self.session.get(Paths.STEP_RES_ID.format(step_result_id), params=params) + + def create_new_step_result(self, **params): + """Create New StepResult, StepResult gets created as soon as execution is fetched""" + raise NotImplementedError + + def update_step_result_information(self, step_result_id, status): + """ + Update StepResult Information by StepResult Id. Available status values: + -1 = UNEXECUTED | 1 = PASS | 2 = FAIL | 3 = WIP | 4 = BLOCKED + """ + json = { + "status": status + } + return self.session.put(Paths.STEP_RES_ID.format(step_result_id), json=json) + + def get_list_of_step_defect(self, step_result_id): + """Get List of StepDefect by StepResult Id""" + return self.session.get(Paths.STEP_RES_DEF.format(step_result_id)) + + def get_list_of_step_defect_by_execution(self, execution_id, expand=""): + """Get List of StepDefect by Execution Id""" + params = { + "executionId": execution_id, + "expand": expand + } + return self.session.get(Paths.STEP_RES_EXEC_DEF, params=params) + + +class TraceabilityResourceEndpoints(EndpointTemplate): + """Rest end point for Traceability""" + + def search_defect_statistics(self, **params): + """Search Defect Statistics by Defect Id/Key List""" + raise NotImplementedError + + def search_execution_by_efect(self, **params): + """Get Execution by Defect Id/Key""" + raise NotImplementedError + + def get_list_of_search_execution_by_test(self, test_id_or_key, **params): + """Get Search Execution List by Test Id/Key""" + params.update(testIdOrKey=test_id_or_key) + return self.session.get(Paths.EXEC_BY_TEST, params=params) + + def get_list_of_search_test_by_requirement(self, **params): + """Get Search Test by Requirement Id/Key""" + raise NotImplementedError + + def export_traceability_report(self, **params): + """Export Traceability Report by Defect Id List""" + raise NotImplementedError + + +class TestcaseResourceEndpoints(EndpointTemplate): + """Following section describes rest resources pertaining to TestcaseResource""" + + def get_test_count_list(self, project_id, version_id, group_fld="user"): + """ + Get Count List of Tests by Project Id, Version Id. + + group_fld ("user" | "component") defaults to "user". + """ + params = { + "projectId": project_id, + "versionId": version_id, + "groupFld": group_fld + } + return self.session.get(Paths.TESTS_COUNT, params=params) + + def get_list_of_saved_searches(self, **params): + """Get List of Saved Searches by SaveSearch Id""" + raise NotImplementedError + + def add_issue_link(self, **params): + """Add Issue Link from Issue to Zephyr Test""" + raise NotImplementedError + + def fetch_tests_by_label(self, project_id, label_name, **params): + """ + Fetch Tests By Label + + The labelName acts like a regex, so the word 'ak' will bring all labels that contains + 'ak' in them like 'akonadi'. The query will also return all test cases without any label + in an entry called 'No Label'. + """ + params.update({"projectId": project_id, "labelName": label_name}) + return self.session.get_paginated(Paths.TESTS_LABEL, query_type="test-label", params=params) + + def fetch_tests_by_component(self, **params): + """Fetch Tests By Component""" + raise NotImplementedError + + def fetch_tests_by_version(self, **params): + """Fetch Tests By Version""" + raise NotImplementedError + + +class UtilResourceEndpoints(EndpointTemplate): + """Following section describes the rest resources related to common utility API(s)""" + + def get_all_versions(self, project_id, **params): + """Get List of Versions (Releases)""" + params.update(projectId=project_id) + return self.session.get(Paths.UTIL_VER_LIST, params=params) + + def get_zephyr_issue_type(self): + """Get Zephyr Issue Type""" + return self.session.get(Paths.CASE_ISSUE_TYPE) + + def get_all_projects(self): + """Get List of Projects""" + return self.session.get(Paths.UTIL_PROJ_LIST) + + def get_all_versions_text(self): + """Get All Versions Text""" + return self.session.get(Paths.UTIL_VER_TEXT) + + def get_list_of_sprints(self, project_id, version_id): + """Get List of Sprints by Project Id and Version Id""" + params = { + "projectId": project_id, + "versionId": version_id + } + return self.session.get(Paths.SPRINT_PROJ_VER, params=params) + + def get_cycle_criteria_info(self, project_id): + """Get Cycle Criteria Information""" + params = { + "projectId": project_id, + } + return self.session.get(Paths.UTIL_CRITERIA, params=params) + + def get_execution_statuses_priorities_components_labels_using_tes(self, **params): + """Get Execution Statuses, Priorities, Components, Labels using testExecutionStatus""" + raise NotImplementedError + + def get_execution_statuses_priorities_components_labels_using_tses(self, **params): + """Get Execution Statuses, Priorities, Components, Labels using teststepExecutionStatus""" + raise NotImplementedError + + def get_dashboard_summary(self, **params): + """Get Dashboard information by Query""" + raise NotImplementedError + + def get_components_using_cl(self, **params): + """Gets all component for a project using component-list""" + raise NotImplementedError + + def get_components_using_tsl(self, **params): + """Gets all component for a project using teststatus-list""" + raise NotImplementedError + + +class FolderResourceEndpoints(EndpointTemplate): + """Following section describes rest resources pertaining to Folder Resource""" + + def create_folder_under_cycle(self, project_id, cycle_id, version_id, data=None): + """ + Creates a Folder under the provided cycle + + See Zephyr Squad Folder creation documentation to better understand what can be modified. + https://support.smartbear.com/zephyr-squad-server/docs/api/how-to/create-folder-in-cycle.html + """ + if data is None: + data = {} + + json = { + 'cycleId': cycle_id, + 'projectId': project_id, + 'versionId': version_id + } + merged_json = dict_merge(data, json) + return self.session.post(Paths.FOLDER_CREATE, json=merged_json) + + def update_folder_information(self, project_id, cycle_id, version_id, folder_id, name, + data=None): + """Updates Folder information""" + if data is None: + data = {} + + json = { + 'cycleId': cycle_id, + 'projectId': project_id, + 'versionId': version_id, + "folderId": folder_id, + "name": name, + } + merged_json = dict_merge(data, json) + return self.session.put(Paths.FOLDER_ID.format(folder_id), json=merged_json) + + def delete_folder_under_cycle(self, folder_id, project_id, version_id, cycle_id): + """Deletes a Folder""" + params = { + "projectId": project_id, + "versionId": version_id, + "cycleId": cycle_id, + } + return self.session.delete(Paths.FOLDER_ID.format(folder_id), params=params) + + +class ExecutionResourceEndpoints(EndpointTemplate): + """Following section describes rest resources (API's) pertaining to ExecutionResource""" + + def get_execution_information(self, execution_id, **params): + """Gets all executions available for given execution Id""" + return self.session.get(Paths.EXEC_ID.format(execution_id), + params=params) + + def get_list_of_execution(self, **params): + """Get all execution available for given issue id""" + raise NotImplementedError + + def get_defect_list(self, execution_id): + """Get all defect available for given Execution Id""" + return self.session.get(Paths.EXEC_DEF.format(execution_id)) + + def add_assignee_to_execution(self, execution_id, assignee): + """Add Assignee to execution by Execution Id""" + params = { + 'assignee': assignee + } + return self.session.post(Paths.EXEC_ID.format(execution_id), params=params) + + def create_new_execution(self, project_id, version_id, cycle_id, issue_id, data=None): + """Use this resource to create new execution""" + if data is None: + data = {} + + json = { + 'projectId': project_id, + 'versionId': version_id, + 'cycleId': cycle_id, + 'issueId': issue_id + } + merged_json = dict_merge(data, json) + return self.session.post(Paths.EXEC, json=merged_json) + + def get_execution_count_summary(self, **params): + """ + Get Execution Count List. + + It takes care of 3 execution counts + 1. execution summary - projectId + groupFld:timePeriod + 2. test execution gadget - projectId + version + groupFld:cycle|user|component + 3. burndown - projectId + versionId + cycleId + groupFld:timePeriod + """ + raise NotImplementedError + + def get_top_defect_by_issue_status(self, **params): + """Get Defect List by Project Id, Version Id, Issue Status""" + raise NotImplementedError + + def re_index_all_execution(self, **params): + """ + Re Index all Execution + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=reindex_job_progress. + Once the request is processed, the jobProgress will populate the message field with result. + """ + raise NotImplementedError + + def re_index_all_execution_for_current_node(self, **params): + """ + Re Index all Execution for Current Node + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=reindex_job_progress. + Once the request is processed, the jobProgress will populate the message field with result. + """ + raise NotImplementedError + + def re_index_all_execution_for_given_project_ids(self, **params): + """ + Re Index all Execution for given project id(s) + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=reindex_job_progress. + Once the request is processed, the jobProgress will populate the message field with result. + """ + raise NotImplementedError + + def get_index_status(self, **params): + """Get Index Status by Token""" + raise NotImplementedError + + def get_job_progress_status(self, job_progress_token, **params): + """Get job progress with status""" + return self.session.get(Paths.EXEC_JOB_PROG.format(job_progress_token), params=params) + + def update_bulk_defects(self, **params): + """ + Update bulk Defect by Executions, Defects + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=bulk_execution_associate_defect_job_progress. # pylint: disable=line-too-long + Once the request is processed, the jobProgress will populate the message field with result. + """ + raise NotImplementedError + + def update_execution_details(self, execution_id, status, data=None): + """ + Update Execution Details by Execution Id. Available status values: + -1 = UNEXECUTED | 1 = PASS | 2 = FAIL | 3 = WIP | 4 = BLOCKED + """ + if data is None: + data = {} + + json = { + "status": status + } + merged_json = dict_merge(data, json) + + return self.session.put(Paths.EXEC_EXECUTE.format(execution_id), json=merged_json) + + def delete_execution(self, execution_id): + """Execution with the given Id will be deleted.""" + return self.session.delete(Paths.EXEC_ID.format(execution_id)) + + def add_test_to_cycle(self, project_id, cycle_id, method, data=None): + """ + Add Test to Cycle + + This API will execute based on following conditions: + 1. From individual test required following params: + (assigneeType, cycleId, issues, method = 1, projectId, versionId) + 2. From search filter required following params: + (assigneeType, cycleId, issues, method = 2, projectId, versionId, searchId) + 3. From another cycle required following params: + (assigneeType, cycleId, issues, method = 3, projectId, versionId, components, + fromCycleId, fromVersionId, hasDefects, labels, priorities, statuses) + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=add_tests_to_cycle_job_progress. # pylint: disable=line-too-long + Once the request is processed, the jobProgress will populate the message field with result. + + See Zephyr Squad documentation to better understand what can be modified. + https://support.smartbear.com/zephyr-squad-server/docs/api/how-to/add-tests-to-cycle.html + """ + if data is None: + data = {} + + json = { + 'cycleId': cycle_id, + 'projectId': project_id, + 'method': method + } + merged_json = dict_merge(data, json) + return self.session.post(Paths.CASE_TO_CYCLE, json=merged_json) + + def refresh_issue_link_status(self, **params): + """Refresh Link Status(not found/in progress/completed)""" + raise NotImplementedError + + def export_execution(self, **params): + """Export Selected Execution by Selected Export Format(RSS/HTML/XLS/CSV/XML)""" + raise NotImplementedError + + def navigate_execution(self, **params): + """ + Validates and executes search against zephyr indexes. offset and limit provides a way to + define the beginning and the max limit allowed + """ + raise NotImplementedError + + def re_order_execution(self, **params): + """Re Order Execution""" + raise NotImplementedError + + def get_execution_summaries_by_sprint_and_issue(self, **params): + """Gets all execution available for given Issue Id""" + raise NotImplementedError + + def get_execution_summary_by_issue(self, **params): + """Gets all execution available for given Issue Id""" + raise NotImplementedError + + def get_executions_count_for_cycles_by_given_project_id_and_version_id(self, **params): + """Get Executions count for cycles by given project id and version id""" + raise NotImplementedError + + def get_executions_count_for_given_cycle(self, **params): + """Get Executions count for given cycle""" + raise NotImplementedError + + def get_executions_count_per_assignee_for_given_cycle(self, **params): + """Get Executions count per assignee for given cycle""" + raise NotImplementedError + + def delete_bulk_execution(self, execution_id): + """ + Delete bulk Execution by Execution Id + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=bulk_executions_delete_job_progress. # pylint: disable=line-too-long + Once the request is processed, the jobProgress will populate the message field with result. + """ + if not isinstance(execution_id, list): + execution_id = [ execution_id ] + + json = { + "executions": execution_id + } + return self.session.delete(Paths.EXEC_DEL, json=json) + + def refresh_issue_remote_link(self, **params): + """Refresh Issue to Test/Step Link or Remote Link""" + raise NotImplementedError + + def assign_bulk_executions(self, **params): + """ + Add bulk execution with assignee type + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=bulk_execution_assign_user_job_progress. # pylint: disable=line-too-long + Once the request is processed, the jobProgress will populate the message field with result. + """ + raise NotImplementedError + + def update_bulk_execution_status(self, **params): + """ + Update bulk Execution Status by Status + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=update_bulk_execution_status_job_progress. # pylint: disable=line-too-long + Once the request is processed, the jobProgress will populate the message field with result. + """ + raise NotImplementedError + + +class IssuePickerResourceEndpoints(EndpointTemplate): + """Following section describes the rest resources pertaining to IssuePickerResource""" + + def get_issues_for_test(self, **params): + """Get Issues Data for Test by Query""" + raise NotImplementedError + + def get_default_issue_type(self, project_id): + """Get Default Issue Type by Project Id""" + params = { + "projectId": project_id + } + return self.session.get(Paths.ISSUE_DEFAULT, params=params) + + +class AuditResourceEndpoints(EndpointTemplate): + """Following section describes the rest resources pertaining to AuditResource""" + + def get_audit_log(self, **params): + """Get Audit Log by Entity Type, Event, User""" + raise NotImplementedError + + +class TeststepResourceEndpoints(EndpointTemplate): + """Following section describes the rest resources pertaining to TestStepResource""" + + def delete_teststep(self, issue_id, test_step_id): + """Delete TestStep by TestStep Id, Issue Id""" + return self.session.delete(Paths.STEP_ISSUE_ID.format(issue_id, test_step_id)) + + def get_teststep_information(self, issue_id, test_step_id): + """Get TestStep Information by TestStep Id, IssueId""" + return self.session.get(Paths.STEP_ISSUE_ID.format(issue_id, test_step_id)) + + def update_teststep_data(self, issue_id, test_step_id, + test_step=None, step_data=None, step_result=None): + """Update TestStep Information by TestStep Id, Issue Id""" + json = {} + if test_step is not None: + json['step'] = test_step + if step_data is not None: + json['data'] = step_data + if step_result is not None: + json['result'] = step_result + return self.session.put(Paths.STEP_ISSUE_ID.format(issue_id, test_step_id), json=json) + + def get_list_of_teststeps(self, issue_id, **params): + """Get List of TestSteps by Issue Id. 'offset' and 'limit' are optional""" + return self.session.get(Paths.STEP_ISSUE.format(issue_id), params=params) + + def create_new_teststep(self, issue_id, test_step=None, step_data=None, step_result=None): + """Create New TestStep by Issue Id""" + json = { + 'step': test_step, + 'data': step_data, + 'result': step_result + } + return self.session.post(Paths.STEP_ISSUE.format(issue_id), json=json) + + def move_teststep_to_issue(self, issue_id, test_step_id, after_id=None): + """ + Move TestStep to Issue. + + The API call considers the 'First' position to be last 'Order Id'. If no after_id element + is given, the element is moved by default on the 'First' position. + """ + json = {} + if after_id: + json['after'] = "/jira/" + Paths.STEP_ISSUE_ID.format(issue_id, after_id) + else: + json['position'] = "First" + return self.session.post(Paths.STEP_ISSUE_MOVE.format(issue_id, test_step_id), json=json) + + def clone_teststep(self, issue_id, test_step_id, position=-1): + """Clone TestStep by from TestStep Id, Issue Id""" + json = { + 'position': position + } + return self.session.post(Paths.STEP_ISSUE_CLONE.format(issue_id, test_step_id), json=json) + + def copy_test_steps_from_source_to_destination_issues(self, issue_key, destination_issues, + is_jql=False, copy_attachments=False, copy_custom_field=False): + """ + Copy Test steps from source to destination issues. The keys should be used in all instances. + + This API returns a jobProgressToken which should be used for making the call to + /rest/zapi/latest/execution/jobProgress/:jobProgressToken?type=copy_test_step_job_progress. + Once the request is processed, the jobProgress will populate the message field with result. + """ + if not isinstance(destination_issues, list): + destination_issues = [ destination_issues ] + + json = { + 'destinationIssues': ','.join(destination_issues), + 'copyAttachments': copy_attachments, + 'copyCustomField': copy_custom_field, + 'isJql': is_jql + } + return self.session.post(Paths.STEP_ISSUE_COPY.format(issue_key), json=json) + + +class AttachmentResourceEndpoints(EndpointTemplate): + """ + Following section describes the rest resources (API's) for fetching and uploading attachments. + To get attachment for a given entity (Execution or Its Stepresults) or upload, You would need + its id: Unique Identifier in DB Type: execution | stepresult + """ + + def delete_attachment(self, attach_file_id): + """Delete Attachment by Attachment Id""" + return self.session.delete(Paths.ATTACH_ID.format(attach_file_id)) + + def get_single_attachment(self, attach_file_id): + """Get Attachment Details by Attachment Id""" + return self.session.get(Paths.ATTACH_ID.format(attach_file_id)) + + def add_attachment_into_entity(self, file_path, entity_id, entity_type): + """Add Attachment into Entity by Entity Id, Entity Type""" + params = { + 'entityId': entity_id, + 'entityType': entity_type + } + return self.session.post_file(Paths.ATTACH, file_path=file_path, params=params) + + def get_attachment_by_entity(self, entity_id, entity_type): + """Get Attachments by Entity Id, Entity Type""" + params = { + 'entityId': entity_id, + 'entityType': entity_type + } + return self.session.get(Paths.ATTACH_ENT, params=params) + + def get_attachment_file(self, attach_file_id, attachment_name=None): + """Get Attachment file by Attachment file Id""" + response = self.session.get(Paths.ATTACH_FILE.format(attach_file_id), return_raw=True) + if attachment_name: + with open(attachment_name, 'wb') as file: + file.write(response.content) + return response.content + + +class ZAPIResourceEndpoints(EndpointTemplate): + """Following section describes the rest resources pertaining to ZAPIResource""" + + def get_zapi_module_status(self): + """Get ZAPI Module Status""" + return self.session.get(Paths.MOD_INFO) + + +class ZQLAutoCompleteResourceEndpoints(EndpointTemplate): + """ + Following section describes rest resources (API's) pertaining to + ExecutionFilteAutoCompleteResource + """ + + def get_zql_auto_complete_result(self, **params): + """Get ZQL Auto Complete Result by Field Name, Field Value""" + raise NotImplementedError + + +class SystemInfoResourceEndpoints(EndpointTemplate): + """Following section describes the rest resources pertaining to SystemInfoResource""" + + def get_system_information(self): + """Get System Information""" + return self.session.get(Paths.SYS_INFO) + + +class FilterPickerResourceEndpoints(EndpointTemplate): + """Following section describes the rest resources pertaining to FilterPickerResource""" + + def get_search_for_filter(self, **params): + """Get Search For Filter""" + raise NotImplementedError diff --git a/zephyr/squad/server/endpoints/paths.py b/zephyr/squad/server/endpoints/paths.py new file mode 100644 index 0000000..93089fe --- /dev/null +++ b/zephyr/squad/server/endpoints/paths.py @@ -0,0 +1,109 @@ +"""Paths to form Server API URLs""" + + +class ServerPaths: + """ + Zephyr Squad Server API (v1) paths based on: + https://zephyrsquadserver.docs.apiary.io/ + https://support.smartbear.com/zephyr-squad-server/docs/api/index.html + """ + + _JIRA_API = "rest/api/latest" + _ZEPHYR_API = "rest/zapi/latest" + + # ZQL + ZQL_SEARCH = _ZEPHYR_API + "/zql/executeSearch" + ZQL_CLAUSES = _ZEPHYR_API + "/zql/clauses" + ZQL_AUTOCOMP = _ZEPHYR_API + "/zql/autocompleteZQLJson" + ZQL_FILTER = _ZEPHYR_API + "/zql/executionFilter" + ZQL_FILTER_ID = _ZEPHYR_API + "/zql/executionFilter/{}" + ZQL_FILTER_COPY = _ZEPHYR_API + "/zql/executionFilter/copy" + ZQL_FILTER_FAV = _ZEPHYR_API + "/zql/executionFilter/toggleFav" + ZQL_FILTER_USER = _ZEPHYR_API + "/zql/executionFilter/user" + ZQL_FILTER_RENAME = _ZEPHYR_API + "/zql/executionFilter/rename" + ZQL_FILTER_QUICK_SEARCH = _ZEPHYR_API + "/zql/executionFilter/quickSearch" + + # Test Case + CASE = _JIRA_API + "/issue" + CASE_KEY = _JIRA_API + "/issue/{}" + ISSUE_DEFAULT = _ZEPHYR_API + "/issues/default" + + # Test + TESTS_COUNT = _ZEPHYR_API + "/test/count" + TESTS_LABEL = _ZEPHYR_API + "/test/summary/testsbylabel" + + # Test Cycle + CYCLE = _ZEPHYR_API + "/cycle" + CYCLE_ID = _ZEPHYR_API + "/cycle/{}" + CYCLE_FOLDERS = _ZEPHYR_API + "/cycle/{}/folders" + CYCLE_BY_VER_SPR = _ZEPHYR_API + "/cycle/cyclesByVersionsAndSprint" + + # Test Execution + EXEC = _ZEPHYR_API + "/execution" + EXEC_ID = _ZEPHYR_API + "/execution/{}" + EXEC_EXECUTE = _ZEPHYR_API + "/execution/{}/execute" + EXEC_DEF = _ZEPHYR_API + "/execution/{}/defects" + EXEC_DEL = _ZEPHYR_API + "/execution/deleteExecutions" + EXEC_BY_ISSUE = _ZEPHYR_API + "/execution/executionsByIssue" + EXEC_COUNT_BY_CYCLE = _ZEPHYR_API + "/execution/executionsStatusCountByCycle" + CASE_TO_CYCLE = _ZEPHYR_API + "/execution/addTestsToCycle" + EXEC_JOB_PROG = _ZEPHYR_API + "/execution/jobProgress/{}" + + # Test Step + STEP_ISSUE = _ZEPHYR_API + "/teststep/{}" + STEP_ISSUE_ID = _ZEPHYR_API + "/teststep/{}/{}" + STEP_ISSUE_MOVE = _ZEPHYR_API + "/teststep/{}/{}/move" + STEP_ISSUE_CLONE = _ZEPHYR_API + "/teststep/{}/clone/{}" + STEP_ISSUE_COPY = _ZEPHYR_API + "/teststep/{}/copyteststeps" + + # Test Step Result + STEP_RES = _ZEPHYR_API + "/stepResult" + STEP_RES_ID = _ZEPHYR_API + "/stepResult/{}" + STEP_RES_DEF = _ZEPHYR_API + "/stepResult/{}/defects" + STEP_RES_EXEC_DEF = _ZEPHYR_API + "/stepResult/stepDefects" + + # Traceability + EXEC_BY_TEST = _ZEPHYR_API + "/traceability/executionsByTest" + + # Folder + FOLDER_ID = _ZEPHYR_API + "/folder/{}" + FOLDER_CREATE = _ZEPHYR_API + "/folder/create" + + # Attachment + ATTACH = _ZEPHYR_API + "/attachment" + ATTACH_ID = _ZEPHYR_API + "/attachment/{}" + ATTACH_ENT = _ZEPHYR_API + "/attachment/attachmentsByEntity" + ATTACH_FILE = _ZEPHYR_API + "/attachment/{}/file" + + # Project + PRJ_ID = _JIRA_API + "/project/{}" + PRJ_VERSIONS_BY_KEY = _JIRA_API + "/project/{}/versions" + + # Util + UTIL_VER_LIST = _ZEPHYR_API + "/util/versionBoard-list" + UTIL_PROJ_LIST = _ZEPHYR_API + "/util/project-list" + UTIL_VER_TEXT = _ZEPHYR_API + "/util/allversionstext" + CASE_ISSUE_TYPE = _ZEPHYR_API + "/util/zephyrTestIssueType" + SPRINT_PROJ_VER = _ZEPHYR_API + "/util/sprintsByProjectAndVersion" + UTIL_CRITERIA = _ZEPHYR_API + "/util/cycleCriteriaInfo" + + # Preference + STEP_PREF = _ZEPHYR_API + "/preference/getteststepcustomization" + CYCLE_PREF = _ZEPHYR_API + "/preference/getcyclesummarycustomization" + EXEC_PREF = _ZEPHYR_API + "/preference/getexecutioncustomization" + + # Audit + AUDIT = _ZEPHYR_API + "/audit" + + # License + LICENSE = _ZEPHYR_API + "/license" + + # System Info + SYS_INFO = _ZEPHYR_API + "/systemInfo" + + # Module Info + MOD_INFO = _ZEPHYR_API + "/moduleInfo" + + # ZChart + ZCHART_STATUS = _ZEPHYR_API + "/zchart/issueStatuses" + ZCHART_TESTS = _ZEPHYR_API + "/zchart/testsCreated" diff --git a/zephyr/squad/server/server_api.py b/zephyr/squad/server/server_api.py new file mode 100644 index 0000000..a0caeb0 --- /dev/null +++ b/zephyr/squad/server/server_api.py @@ -0,0 +1,96 @@ +import logging + +from zephyr.squad.zephyr_squad_session import ZephyrSquadSession +from zephyr.squad.server import endpoints + + +# pylint: disable=missing-function-docstring +class ServerApiWrapper: + """Zephyr Squad Server Api wrapper""" + def __init__(self, session: ZephyrSquadSession): + self.session = session + self.logger = logging.getLogger(__name__) + + @property + def chart_resource(self): + return endpoints.ChartResourceEndpoints(self.session) + + @property + def execution_search_resource(self): + return endpoints.ExecutionSearchResourceEndpoints(self.session) + + @property + def zql_filter_resource(self): + return endpoints.ZQLFilterResourceEndpoints(self.session) + + @property + def cycle_resource(self): + return endpoints.CycleResourceEndpoints(self.session) + + @property + def znav_resource(self): + return endpoints.ZNavResourceEndpoints(self.session) + + @property + def license_resource(self): + return endpoints.LicenseResourceEndpoints(self.session) + + @property + def preference_resource(self): + return endpoints.PreferenceResourceEndpoints(self.session) + + @property + def step_result_resource(self): + return endpoints.StepResultResourceEndpoints(self.session) + + @property + def traceability_resource(self): + return endpoints.TraceabilityResourceEndpoints(self.session) + + @property + def testcase_resource(self): + return endpoints.TestcaseResourceEndpoints(self.session) + + @property + def util_resource(self): + return endpoints.UtilResourceEndpoints(self.session) + + @property + def folder_resource(self): + return endpoints.FolderResourceEndpoints(self.session) + + @property + def execution_resource(self): + return endpoints.ExecutionResourceEndpoints(self.session) + + @property + def issue_picker_resource(self): + return endpoints.IssuePickerResourceEndpoints(self.session) + + @property + def audit_resource(self): + return endpoints.AuditResourceEndpoints(self.session) + + @property + def teststep_resource(self): + return endpoints.TeststepResourceEndpoints(self.session) + + @property + def attachment_resource(self): + return endpoints.AttachmentResourceEndpoints(self.session) + + @property + def zapi_resource(self): + return endpoints.ZAPIResourceEndpoints(self.session) + + @property + def zql_autocomplete_resource(self): + return endpoints.ZQLAutoCompleteResourceEndpoints(self.session) + + @property + def systeminfo_resource(self): + return endpoints.SystemInfoResourceEndpoints(self.session) + + @property + def filter_picker_resource(self): + return endpoints.FilterPickerResourceEndpoints(self.session) diff --git a/zephyr/squad/squad.py b/zephyr/squad/squad.py new file mode 100644 index 0000000..2fc5536 --- /dev/null +++ b/zephyr/squad/squad.py @@ -0,0 +1,26 @@ +import logging + +from zephyr.squad.zephyr_squad_session import ZephyrSquadSession +from zephyr.squad.server.server_api import ServerApiWrapper +from zephyr.squad.server.actions import ServerActionsWrapper + +DEFAULT_BASE_URL = "https://jira.hosted.com/" + + +class ZephyrSquad: + """ + Zephyr Squad base object to interact with other objects or raw api by its methods. + + :param base_url: base API url to connect with + """ + def __init__(self, base_url=None, **kwargs): + base_url = DEFAULT_BASE_URL if not base_url else base_url + session = ZephyrSquadSession(base_url=base_url, **kwargs) + self.api = ServerApiWrapper(session) + self.actions = ServerActionsWrapper(session) + self.logger = logging.getLogger(__name__) + + @classmethod + def server_api(cls, base_url, **kwargs): + """Alternative constructor for Zephyr Squad Server client""" + return cls(base_url=base_url, **kwargs) diff --git a/zephyr/squad/zephyr_squad_session.py b/zephyr/squad/zephyr_squad_session.py new file mode 100644 index 0000000..638a000 --- /dev/null +++ b/zephyr/squad/zephyr_squad_session.py @@ -0,0 +1,83 @@ +from zephyr.common.zephyr_session import ZephyrSession + +class ZephyrSquadSession(ZephyrSession): + """ + Zephyr Squad basic session object. + + :param base_url: url to make requests to + :param token: auth token + :param username: username + :param password: password + :param cookies: cookie dict + + :keyword session_attrs: a dict with session attrs to be set as keys and their values + """ + + def _paginated_test_label(self, endpoint, params): + max_records = params.get("maxRecords") if params.get("maxRecords") else 10 + remove_no_labels = False + while True: + response = self.get(endpoint, params=params) + if "values" not in response: + return + for value in response.get("values"): + if remove_no_labels and value.get('name') == 'No Label': + continue + remove_no_labels = True + yield value + if len(response.get("values")) + params.get("offset", 0) >= response.get("totalCount"): + break + + if params.get("offset"): + new_offset = params['offset'] + max_records + else: + new_offset = max_records + params.update(offset=new_offset) + + def _paginated_execution(self, endpoint, params): + while True: + response = self.get(endpoint, params=params) + if "executions" not in response or not response.get("executions"): + return + for execution in response.get("executions"): + yield execution + if response.get("linksNew")[-1] == response.get("currentIndex") or \ + response.get("maxResultAllowed") + response.get("offset", 0) \ + >= response.get("totalCount"): + break + + if params.get("offset"): + new_offset = params['offset'] + response.get("maxResultAllowed") + else: + new_offset = response.get("maxResultAllowed") + params.update(offset=new_offset) + + def get_paginated(self, endpoint, query_type, params=None): + """Get paginated data""" + available_types = [ "execution", "test-label" ] + if query_type not in available_types: + raise AttributeError( + f"{query_type} is not a valid query type! Available: {','.join(available_types)}" + ) + + self.logger.debug(f"Get paginated data from endpoint={endpoint} and params={params}") + if params is None: + params = {} + + if query_type == "execution": + return self._paginated_execution(endpoint, params) + return self._paginated_test_label(endpoint, params) + + def post_file(self, endpoint: str, file_path: str, to_files=None, **kwargs): + """ + Post wrapper to send a file. Handles single file opening, + sending its content and closing + """ + with open(file_path, "rb") as file: + content_type = "multipart/form-data" + files = {'file': (file_path, file, content_type)} + + if to_files: + files.update(to_files) + + return self._request("post", endpoint, files=files, **kwargs) diff --git a/zephyr/utils/common.py b/zephyr/utils/common.py index 0ed7c0f..55001f3 100644 --- a/zephyr/utils/common.py +++ b/zephyr/utils/common.py @@ -1,4 +1,5 @@ """Common helper functions to use with the package""" +from copy import deepcopy def cookie_str_to_dict(cookie_str: str) -> dict: @@ -13,3 +14,14 @@ def cookie_str_to_dict(cookie_str: str) -> dict: _key, _value = cookie_substr.strip().split("=", maxsplit=1) cookie_dict.update({_key: _value}) return cookie_dict + + +def dict_merge(source, overwrite): + """Recursively merges 2 dictionaries and return the merged dictionary""" + result = deepcopy(source) + for key, val in overwrite.items(): + if key in result and isinstance(result[key], dict): + result[key] = dict_merge(result[key], val) + else: + result[key] = deepcopy(val) + return result