diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a598fec7..92028cd0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,7 +7,7 @@ repos: - id: trailing-whitespace - repo: https://github.com/pycqa/flake8 - rev: 5.0.4 + rev: 7.3.0 hooks: - id: flake8 diff --git a/README.md b/README.md index bf311868..9dcd6f1e 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,7 @@ For more code samples on how to integrate the auth0-python SDK in your Python ap - Rules() ( `Auth0().rules` ) - SelfServiceProfiles() ( `Auth0().self_service_profiles` ) - Stats() ( `Auth0().stats` ) +- Sessions() ( `Auth0().sessions` ) - Tenants() ( `Auth0().tenants` ) - Tickets() ( `Auth0().tickets` ) - UserBlocks() (`Auth0().user_blocks` ) diff --git a/auth0/management/__init__.py b/auth0/management/__init__.py index 761446b6..b1a5fc76 100644 --- a/auth0/management/__init__.py +++ b/auth0/management/__init__.py @@ -24,6 +24,7 @@ from .rules import Rules from .rules_configs import RulesConfigs from .self_service_profiles import SelfServiceProfiles +from .sessions import Sessions from .stats import Stats from .tenants import Tenants from .tickets import Tickets @@ -64,6 +65,7 @@ "Rules", "SelfServiceProfiles", "Stats", + "Sessions", "Tenants", "Tickets", "UserBlocks", diff --git a/auth0/management/sessions.py b/auth0/management/sessions.py new file mode 100644 index 00000000..f626d83a --- /dev/null +++ b/auth0/management/sessions.py @@ -0,0 +1,84 @@ +from typing import Any + +from auth0.rest import RestClient +from auth0.rest import RestClientOptions +from auth0.types import TimeoutType + + +class Sessions: + """Auth0 users endpoints + + Args: + domain (str): Your Auth0 domain, e.g: 'username.auth0.com' + + token (str): Management API v2 Token + + telemetry (bool, optional): Enable or disable Telemetry + (defaults to True) + + timeout (float or tuple, optional): Change the requests + connect and read timeout. Pass a tuple to specify + both values separately or a float to set both to it. + (defaults to 5.0 for both) + + protocol (str, optional): Protocol to use when making requests. + (defaults to "https") + + rest_options (RestClientOptions): Pass an instance of + RestClientOptions to configure additional RestClient + options, such as rate-limit retries. + (defaults to None) + """ + + def __init__( + self, + domain: str, + token: str, + telemetry: bool = True, + timeout: TimeoutType = 5.0, + protocol: str = "https", + rest_options: RestClientOptions | None = None, + ) -> None: + self.domain = domain + self.protocol = protocol + self.client = RestClient(jwt=token, telemetry=telemetry, timeout=timeout, options=rest_options) + + def _url(self, id: str | None = None) -> str: + url = f"{self.protocol}://{self.domain}/api/v2/sessions" + if id is not None: + return f"{url}/{id}" + return url + + def get(self, id: str) -> dict[str, Any]: + """Get a session. + + Args: + id (str): The id of the session to retrieve. + + See: https://auth0.com/docs/api/management/v2#!/Sessions/get-session + """ + + return self.client.get(self._url(id)) + + def delete(self, id: str) -> None: + """Delete a session. + + Args: + id (str): The id of the session to delete. + + See: https://auth0.com/docs/api/management/v2#!/Sessions/delete-session + """ + + return self.client.delete(self._url(id)) + + def revoke(self, id: str) -> None: + """Revokes a session by ID and all associated refresh tokens.. + + Args: + id (str): The id of the session to revoke. + + See: https://auth0.com/docs/api/management/v2#!/Sessions/revoke-session + """ + + url = self._url(f"{id}/sessions") + return self.client.post(url) diff --git a/auth0/management/users.py b/auth0/management/users.py index 2fd9a46a..6a29a92a 100644 --- a/auth0/management/users.py +++ b/auth0/management/users.py @@ -348,7 +348,9 @@ def unlink_user_account(self, id: str, provider: str, user_id: str) -> Any: url = self._url(f"{id}/identities/{provider}/{user_id}") return self.client.delete(url) - def link_user_account(self, user_id: str, body: dict[str, Any]) -> list[dict[str, Any]]: + def link_user_account( + self, user_id: str, body: dict[str, Any] + ) -> list[dict[str, Any]]: """Link user accounts. Links the account specified in the body (secondary account) to the @@ -540,44 +542,66 @@ def delete_authentication_method_by_id( return self.client.delete(url) def list_tokensets( - self, id: str, page: int = 0, per_page: int = 25, include_totals: bool = True - ): - """List all the tokenset(s) associated to the user. - - Args: - id (str): The user's id. - - page (int, optional): The result's page number (zero based). By default, - retrieves the first page of results. - - per_page (int, optional): The amount of entries per page. By default, - retrieves 25 results per page. - - include_totals (bool, optional): True if the query summary is - to be included in the result, False otherwise. Defaults to True. - - See https://auth0.com/docs/api/management/v2#!/Users/get_tokensets - """ - - params = { - "per_page": per_page, - "page": page, - "include_totals": str(include_totals).lower(), - } - url = self._url(f"{id}/federated-connections-tokensets") - return self.client.get(url, params=params) - - def delete_tokenset_by_id( - self, user_id: str, tokenset_id: str - ) -> Any: - """Deletes an tokenset by ID. - - Args: - user_id (str): The user_id to delete an authentication method by ID for. - tokenset_id (str): The tokenset_id to delete an tokenset by ID for. - - See: https://auth0.com/docs/api/management/v2#!/Users/delete_tokenset_by_id - """ - - url = self._url(f"{user_id}/federated-connections-tokensets/{tokenset_id}") - return self.client.delete(url) \ No newline at end of file + self, id: str, page: int = 0, per_page: int = 25, include_totals: bool = True + ): + """List all the tokenset(s) associated to the user. + + Args: + id (str): The user's id. + + page (int, optional): The result's page number (zero based). By default, + retrieves the first page of results. + + per_page (int, optional): The amount of entries per page. By default, + retrieves 25 results per page. + + include_totals (bool, optional): True if the query summary is + to be included in the result, False otherwise. Defaults to True. + + See https://auth0.com/docs/api/management/v2#!/Users/get_tokensets + """ + + params = { + "per_page": per_page, + "page": page, + "include_totals": str(include_totals).lower(), + } + url = self._url(f"{id}/federated-connections-tokensets") + return self.client.get(url, params=params) + + def delete_tokenset_by_id(self, user_id: str, tokenset_id: str) -> Any: + """Deletes an tokenset by ID. + + Args: + user_id (str): The user_id to delete an authentication method by ID for. + tokenset_id (str): The tokenset_id to delete an tokenset by ID for. + + See: https://auth0.com/docs/api/management/v2#!/Users/delete_tokenset_by_id + """ + + url = self._url(f"{user_id}/federated-connections-tokensets/{tokenset_id}") + return self.client.delete(url) + + def get_sessions(self, user_id: str) -> dict[str, Any]: + """Get all sessions details for the given user. + + Args: + user_id (str): The user_id to get all sessions for the given user for. + + see: https://auth0.com/docs/api/management/v2#!/Users/get-sessions-for-user + """ + + url = self._url(f"{user_id}/sessions") + return self.client.get(url) + + def delete_sessions(self, user_id: str) -> dict[str, Any]: + """Delete all sessions for the given user. + + Args: + user_id (str): The user_id to delete all session for the given user for. + + See: https://auth0.com/docs/api/management/v2#!/Users/delete-sessions-for-user + """ + + url = self._url(f"{user_id}/sessions") + return self.client.delete(url) diff --git a/auth0/test/management/test_sessions.py b/auth0/test/management/test_sessions.py new file mode 100644 index 00000000..5ecd0923 --- /dev/null +++ b/auth0/test/management/test_sessions.py @@ -0,0 +1,45 @@ +import unittest +from unittest import mock + +from ...management.sessions import Sessions + + +class TestSessions(unittest.TestCase): + def test_init_with_optionals(self): + t = Sessions(domain="domain", token="jwttoken", telemetry=False, timeout=(10, 2)) + self.assertEqual(t.client.options.timeout, (10, 2)) + telemetry_header = t.client.base_headers.get("Auth0-Client", None) + self.assertEqual(telemetry_header, None) + + @mock.patch("auth0.management.sessions.RestClient") + def test_get(self, mock_rc): + mock_instance = mock_rc.return_value + + u = Sessions(domain="domain", token="jwttoken") + u.get("session_id") + + mock_instance.get.assert_called_with( + "https://domain/api/v2/sessions/session_id" + ) + + @mock.patch("auth0.management.sessions.RestClient") + def test_delete(self, mock_rc): + mock_instance = mock_rc.return_value + + u = Sessions(domain="domain", token="jwttoken") + u.delete("session_id") + + mock_instance.delete.assert_called_with( + "https://domain/api/v2/sessions/session_id" + ) + + @mock.patch("auth0.management.sessions.RestClient") + def test_revoke(self, mock_rc): + mock_instance = mock_rc.return_value + + u = Sessions(domain="domain", token="jwttoken") + u.revoke("session_id") + + mock_instance.post.assert_called_with( + "https://domain/api/v2/sessions/session_id/sessions" + ) diff --git a/auth0/test/management/test_users.py b/auth0/test/management/test_users.py index 9cab65f6..b2fb4449 100644 --- a/auth0/test/management/test_users.py +++ b/auth0/test/management/test_users.py @@ -435,4 +435,26 @@ def test_delete_tokenset_by_id(self, mock_rc): mock_instance.delete.assert_called_with( "https://domain/api/v2/users/user_id/federated-connections-tokensets/tokenset_id" - ) \ No newline at end of file + ) + + @mock.patch("auth0.management.users.RestClient") + def test_get_sessions_by_user(self, mock_rc): + mock_instance = mock_rc.return_value + + u = Users(domain="domain", token="jwttoken") + u.get_sessions("user_id") + + mock_instance.get.assert_called_with( + "https://domain/api/v2/users/user_id/sessions" + ) + + @mock.patch("auth0.management.users.RestClient") + def test_delete_sessions_by_user(self, mock_rc): + mock_instance = mock_rc.return_value + + u = Users(domain="domain", token="jwttoken") + u.delete_sessions("user_id") + + mock_instance.delete.assert_called_with( + "https://domain/api/v2/users/user_id/sessions" + )