From d2c961aff5d176d17570beb513b043759eed6b7c Mon Sep 17 00:00:00 2001 From: CK Ng Date: Fri, 24 Feb 2023 23:55:03 +0800 Subject: [PATCH 1/4] Add option to decode access token --- .gitignore | 2 ++ README.rst | 4 ++++ src/django_cognito_jwt/backend.py | 1 + src/django_cognito_jwt/validator.py | 24 ++++++++++++++++-------- 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 8ed2328..245d384 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ .cache .python-version .venv +venv/ .idea/ /build/ @@ -17,3 +18,4 @@ # Editors .idea/ +.vscode/ diff --git a/README.rst b/README.rst index a8f9f6b..b3423a2 100644 --- a/README.rst +++ b/README.rst @@ -72,3 +72,7 @@ you can use the ``COGNITO_USER_MODEL`` setting. .. code-block:: python COGNITO_USER_MODEL = "myproject.AppUser" + +The library by default uses id token. To use access token, add the following lines to your Django ``settings.py`` file: +.. {'id', 'access'} Default: 'id' +COGNITO_TOKEN_TYPE = 'access' \ No newline at end of file diff --git a/src/django_cognito_jwt/backend.py b/src/django_cognito_jwt/backend.py index 941c0f5..c66fd54 100644 --- a/src/django_cognito_jwt/backend.py +++ b/src/django_cognito_jwt/backend.py @@ -58,6 +58,7 @@ def get_token_validator(self, request): settings.COGNITO_AWS_REGION, settings.COGNITO_USER_POOL, settings.COGNITO_AUDIENCE, + settings.COGNITO_TOKEN_TYPE, ) def authenticate_header(self, request): diff --git a/src/django_cognito_jwt/validator.py b/src/django_cognito_jwt/validator.py index 80d7546..9d4fc79 100644 --- a/src/django_cognito_jwt/validator.py +++ b/src/django_cognito_jwt/validator.py @@ -1,4 +1,5 @@ import json +from typing import Literal import jwt import requests @@ -13,10 +14,14 @@ class TokenError(Exception): class TokenValidator: - def __init__(self, aws_region, aws_user_pool, audience): + def __init__(self, aws_region, aws_user_pool, audience, token_type: Literal["id", "access"] = "id"): self.aws_region = aws_region self.aws_user_pool = aws_user_pool self.audience = audience + self.token_type = token_type + + if token_type not in ["id", "access"]: + raise TokenError("Invalid token type. Choose either id or access token.") @cached_property def pool_url(self): @@ -58,13 +63,16 @@ def validate(self, token): raise TokenError("No key found for this token") try: - jwt_data = jwt.decode( - token, - public_key, - audience=self.audience, - issuer=self.pool_url, - algorithms=["RS256"], - ) + params = { + "jwt": token, + "key": public_key, + "issuer": self.pool_url, + "algorithms": ["RS256"] + } + if self.token_type == "id": + params.update({"audience": self.audience}) + + jwt_data = jwt.decode(**params) except ( jwt.InvalidTokenError, jwt.ExpiredSignatureError, From ade07ec38775b028fc6281d2eeb78875b432a6da Mon Sep 17 00:00:00 2001 From: CK Ng Date: Sat, 25 Feb 2023 00:00:18 +0800 Subject: [PATCH 2/4] Update readme --- README.rst | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index b3423a2..1b9bd08 100644 --- a/README.rst +++ b/README.rst @@ -74,5 +74,7 @@ you can use the ``COGNITO_USER_MODEL`` setting. COGNITO_USER_MODEL = "myproject.AppUser" The library by default uses id token. To use access token, add the following lines to your Django ``settings.py`` file: -.. {'id', 'access'} Default: 'id' -COGNITO_TOKEN_TYPE = 'access' \ No newline at end of file + +.. code-block:: python + + COGNITO_TOKEN_TYPE = "access" # '{'id', 'access'} Default: 'id' From ed5866f639511ea7783cf9db6515466514d79f40 Mon Sep 17 00:00:00 2001 From: CK Ng Date: Sun, 26 Feb 2023 11:10:06 +0800 Subject: [PATCH 3/4] Add client_id validation for access token, and get_user_info method. The client_id from access token will be validated whether it matches the one defined in the `settings.COGNITO_AUDIENCE`. When using access token, the backend will call the userinfo endpoint from Amazon Cognito, to obtain the user info based on the access key, and pass into the payload inside the Django user_model's get_or_create_for_cognito method. --- README.rst | 10 +++++++++- src/django_cognito_jwt/backend.py | 18 +++++++++++++++++- src/django_cognito_jwt/validator.py | 5 +++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 1b9bd08..896f4e0 100644 --- a/README.rst +++ b/README.rst @@ -77,4 +77,12 @@ The library by default uses id token. To use access token, add the following lin .. code-block:: python - COGNITO_TOKEN_TYPE = "access" # '{'id', 'access'} Default: 'id' + COGNITO_TOKEN_TYPE = "access" # {'id', 'access'}, default 'id' + + +As the payload of access token only contains basic user info, we could obtain further info from the `UserInfo endpoint`. +You need to specify the Cognito domain in the ``settings.py`` file to obtain the user info from the endpoint, as follows: + +.. code-block:: python + + COGNITO_DOMAIN = "your-user-pool-domain" # eg, exampledomain.auth.ap-southeast-1.amazoncognito.com diff --git a/src/django_cognito_jwt/backend.py b/src/django_cognito_jwt/backend.py index c66fd54..97173ae 100644 --- a/src/django_cognito_jwt/backend.py +++ b/src/django_cognito_jwt/backend.py @@ -1,4 +1,6 @@ import logging +import requests +import json from django.apps import apps as django_apps from django.conf import settings @@ -29,12 +31,26 @@ def authenticate(self, request): raise exceptions.AuthenticationFailed() USER_MODEL = self.get_user_model() - user = USER_MODEL.objects.get_or_create_for_cognito(jwt_payload) + if settings.COGNITO_TOKEN_TYPE == "access": + user_info = self.get_user_info(jwt_token.decode("UTF-8")) + user_info = json.loads(user_info.decode("UTF-8")) + user = USER_MODEL.objects.get_or_create_for_cognito(user_info) + else: + user = USER_MODEL.objects.get_or_create_for_cognito(jwt_payload) return (user, jwt_token) def get_user_model(self): user_model = getattr(settings, "COGNITO_USER_MODEL", settings.AUTH_USER_MODEL) return django_apps.get_model(user_model, require_ready=False) + + def get_user_info(self, access_token): + if settings.COGNITO_TOKEN_TYPE == "access": + url = f"https://{settings.COGNITO_DOMAIN}/oauth2/userInfo" + + headers = {'Authorization': f'Bearer {access_token}'} + + res = requests.get(url, headers=headers) + return res.content def get_jwt_token(self, request): auth = get_authorization_header(request).split() diff --git a/src/django_cognito_jwt/validator.py b/src/django_cognito_jwt/validator.py index 9d4fc79..df3e120 100644 --- a/src/django_cognito_jwt/validator.py +++ b/src/django_cognito_jwt/validator.py @@ -73,6 +73,11 @@ def validate(self, token): params.update({"audience": self.audience}) jwt_data = jwt.decode(**params) + if self.token_type == "access": + if "access" not in jwt_data["token_use"]: + raise TokenError("Incorrect token use") + if jwt_data["client_id"] not in self.audience: + raise TokenError("Incorrect client_id") except ( jwt.InvalidTokenError, jwt.ExpiredSignatureError, From a6756b6217713fec1609e4ad26ee999dfb11c851 Mon Sep 17 00:00:00 2001 From: CK Ng Date: Sun, 26 Feb 2023 23:00:17 +0800 Subject: [PATCH 4/4] Split the get_or_create_for_cognito method. Split the method into get_user and create_for_cognito. Spliting the method into two allows more efficient calling of the method, especially when using access token. The get_user method will first check whether the user exists. If exists, it will directly skip the get requests from UserInfo endpoint, and also skip the attempt for user creation, and directly return the user object. The UserInfo endpoint will be only called during the initial registeration of the client that does not exist in User database. --- README.rst | 40 +++++++++++++++++++++++++++++++ src/django_cognito_jwt/backend.py | 17 ++++++++----- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index 896f4e0..895c6e8 100644 --- a/README.rst +++ b/README.rst @@ -86,3 +86,43 @@ You need to specify the Cognito domain in the ``settings.py`` file to obtain the .. code-block:: python COGNITO_DOMAIN = "your-user-pool-domain" # eg, exampledomain.auth.ap-southeast-1.amazoncognito.com + +To use the backend functions, at the DJANGO_USER_MODEL, could define methods as follows: + +.. code-block:: python + + class CustomizedUserManager(UserManager): + def get_user(self, payload): + cognito_id = payload['sub'] + try: + return self.get(cognito_id=cognito_id) + except self.model.DoesNotExist: + return None + + def create_for_cognito(self, payload): + """Get any value from `payload` here + ipdb> pprint(payload) + {'aud': '159ufjrihgehb67sn373aotli7', + 'auth_time': 1583503962, + 'cognito:username': 'john-rambo', + 'email': 'foggygiga@gmail.com', + 'email_verified': True, + 'event_id': 'd92a99c2-c49e-4312-8a57-c0dccb84f1c3', + 'exp': 1583507562, + 'iat': 1583503962, + 'iss': 'https://cognito-idp.us-west-2.amazonaws.com/us-west-2_flCJaoDig', + 'sub': '2e4790a0-35a4-45d7-b10c-ced79be22e94', + 'token_use': 'id'} + """ + cognito_id = payload['sub'] + + try: + user = self.create( + username= payload["cognito:username"] if payload.get("cognito:username") else payload["username"], + cognito_id=cognito_id, + email=payload['email'], + is_active=True) + except IntegrityError: + user = self.get(cognito_id=cognito_id) + + return user \ No newline at end of file diff --git a/src/django_cognito_jwt/backend.py b/src/django_cognito_jwt/backend.py index 97173ae..8c5a3f4 100644 --- a/src/django_cognito_jwt/backend.py +++ b/src/django_cognito_jwt/backend.py @@ -31,12 +31,17 @@ def authenticate(self, request): raise exceptions.AuthenticationFailed() USER_MODEL = self.get_user_model() - if settings.COGNITO_TOKEN_TYPE == "access": - user_info = self.get_user_info(jwt_token.decode("UTF-8")) - user_info = json.loads(user_info.decode("UTF-8")) - user = USER_MODEL.objects.get_or_create_for_cognito(user_info) - else: - user = USER_MODEL.objects.get_or_create_for_cognito(jwt_payload) + user = USER_MODEL.objects.get_user(jwt_payload) + if not user: + # Create new user if not exists + payload = jwt_payload + if settings.COGNITO_TOKEN_TYPE == "access": + user_info = self.get_user_info(jwt_token.decode("UTF-8")) + user_info = json.loads(user_info.decode("UTF-8")) + payload = user_info + + user = USER_MODEL.objects.create_for_cognito(payload) + return (user, jwt_token) def get_user_model(self):