diff --git a/README.md b/README.md index 909477b..a68917f 100644 --- a/README.md +++ b/README.md @@ -398,3 +398,42 @@ Flake8 Code tests ``` poetry run flake8 ./tina4_python ``` + +### Swagger Authentication Methods & Middleware + +#### Authentication Methods + +Support is added for Token (Bearer) authentication by default + +If there is a need to pass the token in the header or query parameters then you can add the following decorators to your route + +``` +@headerauth() +``` +You then pass it as a header value. The key needs to be X-API-KEY + +``` +@queryauth() +``` +You then pass it as a query parameter. The parameter needs to be api-key + +#### Middleware + +You can execute middleware when a route is called. Here you can authenticate a user before the default authentication or inject values into the header or request + +There is a file called Middleware.py in the src directory. Add all your middleware here + +To authenticate a user simply add the following to the request + +``` +request["validated"] = True +``` + +To run middleware simply add the function name to the @post, @get, @put etc decorator + +``` +@post('/api/user/authenticate', 'authenticate_user') +``` +Then add a authenticate_user function to Middleware.py and do any checks there + + diff --git a/tina4_python/Auth.py b/tina4_python/Auth.py index 65768c2..fada6fd 100644 --- a/tina4_python/Auth.py +++ b/tina4_python/Auth.py @@ -174,5 +174,11 @@ def validate(self, token): return False - def valid(self, token): + def valid(self, token, override_method = None): + if override_method is not None: + try: + return override_method(token) + except Exception: + return False + return self.validate(token) diff --git a/tina4_python/Router.py b/tina4_python/Router.py index ac75f78..de09a05 100644 --- a/tina4_python/Router.py +++ b/tina4_python/Router.py @@ -75,12 +75,81 @@ async def get_result(url, method, request, headers, session): content = Template.render_twig_template( "errors/403.twig", {"server": {"url": url}}) + current_route = None validated = False + + # Get all the route parameters + for route in tina4_python.tina4_routes.values(): + if route["method"] != method: + continue + Debug("Matching route " + route['route'] + " to " + url, Constant.TINA4_LOG_DEBUG) + if Router.match(url, route['route']): + Debug("Route matched: " + route['route'], Constant.TINA4_LOG_DEBUG) + current_route = route + exit + + # If the route is not found + if current_route is None: + return Response(content, Constant.HTTP_NOT_FOUND, Constant.TEXT_HTML) + + # if we need to execute middleware + if "middleware" in current_route: + if current_route["middleware"] is not None: + middleware = current_route["middleware"] + Debug("Middleware found: " + middleware, Constant.TINA4_LOG_DEBUG) + + try: + import importlib + + module = importlib.import_module("src.Middleware") + + # Get the Middleware class from the module + middleware_class = getattr(module, 'Middleware') + + # Create an instance of Middleware + middleware_instance = middleware_class() + + # Get the middleware function + middleware_function = getattr(middleware_instance, middleware) + + if callable(middleware_function): + # Execute the middleware - We can pass additional parameters in the request + [request, headers] = middleware_function(request, headers) + Debug("Middleware executed", Constant.TINA4_LOG_DEBUG) + else: + Debug("Middleware function is not callable", Constant.TINA4_LOG_DEBUG) + except (AttributeError, ImportError) as e: + Debug(f"Error: {str(e)}", Constant.TINA4_LOG_DEBUG) + + # If the middleware has validated the user then we can carry on + if "validated" in request: + validated = request["validated"] + # check to see if we have an auth ability if "authorization" in headers: - token = headers["authorization"].replace("Bearer", "").strip() - if tina4_python.tina4_auth.valid(token): - validated = True + if "Bearer" in headers["authorization"]: + token = headers["authorization"].replace("Bearer", "").strip() + if tina4_python.tina4_auth.valid(token): + validated = True + + Debug(current_route, Constant.TINA4_LOG_DEBUG) + + # check if we can authorize with an API key in the header + if current_route["swagger"] is not None: + if "headerauth" in current_route["swagger"]: + if current_route["swagger"]["headerauth"]: + if "x-api-key" in headers: + token = headers["x-api-key"].strip() + if tina4_python.tina4_auth.valid(token): + validated = True + + # check if we can authorize with an API key in the query string + if "queryauth" in current_route["swagger"]: + if current_route["swagger"]["queryauth"]: + if "api-key" in request["params"]: + token = request["params"]["api-key"] + if tina4_python.tina4_auth.valid(token): + validated = True if request["params"] is not None and "formToken" in request["params"]: token = request["params"]["formToken"] @@ -113,31 +182,28 @@ async def get_result(url, method, request, headers, session): with open(static_file, 'rb') as file: return Response(file.read(), Constant.HTTP_OK, mime_type) - for route in tina4_python.tina4_routes.values(): - if route["method"] != method: - continue - Debug("Matching route " + route['route'] + " to " + url, Constant.TINA4_LOG_DEBUG) - if Router.match(url, route['route']): - if "swagger" in route and route["swagger"] is not None and "secure" in route["swagger"]: - if route["swagger"]["secure"] and not validated: + # check if we have a current route + if current_route is not None: + if "swagger" in current_route and current_route["swagger"] is not None and "secure" in current_route["swagger"]: + if current_route["swagger"]["secure"] and not validated: + if not validated: return Response(content, Constant.HTTP_FORBIDDEN, Constant.TEXT_HTML) - router_response = route["callback"] + router_response = current_route["callback"] - # Add the inline variables & construct a Request variable - request["params"].update(Router.variables) + # Add the inline variables & construct a Request variable + request["params"].update(Router.variables) - Request.request = request # Add the request object - Request.headers = headers # Add the headers - Request.params = request["params"] - Request.body = request["body"] if "body" in request else None - Request.session = session + Request.request = request # Add the request object + Request.headers = headers # Add the headers + Request.params = request["params"] + Request.body = request["body"] if "body" in request else None + Request.session = session - tina4_python.tina4_current_request = Request - old_stdout = sys.stdout # Memorize the default stdout stream - sys.stdout = buffer = io.StringIO() - result = await router_response(request=Request, response=Response) - break + tina4_python.tina4_current_request = Request + old_stdout = sys.stdout # Memorize the default stdout stream + sys.stdout = buffer = io.StringIO() + result = await router_response(request=Request, response=Response) if result is None: sys.stdout = old_stdout @@ -183,7 +249,7 @@ def clean_url(url): # adds a route to the router @staticmethod - def add(method, route, callback): + def add(method, route, callback, middleware=None): Debug("Adding a route: " + route, Constant.TINA4_LOG_DEBUG) if not callback in tina4_python.tina4_routes: tina4_python.tina4_routes[callback] = {"route": route, "callback": callback, "method": method, "swagger": None} @@ -192,13 +258,17 @@ def add(method, route, callback): tina4_python.tina4_routes[callback]["callback"] = callback tina4_python.tina4_routes[callback]["method"] = method + if middleware is not None: + tina4_python.tina4_routes[callback]["middleware"] = middleware + Debug("Adding Middleware " + middleware, Constant.TINA4_LOG_DEBUG) + if '{' in route: # store the parameters if needed route_variables = re.findall(r'{(.*?)}', route) tina4_python.tina4_routes[callback]["params"] = route_variables -def get(path: str): +def get(path: str, middleware=None): """ Get router :param arguments: @@ -207,13 +277,13 @@ def get(path: str): def actual_get(callback): route_paths = path.split('|') for route_path in route_paths: - Router.add(Constant.TINA4_GET, route_path, callback) + Router.add(Constant.TINA4_GET, route_path, callback, middleware) return callback return actual_get -def post(path): +def post(path, middleware=None): """ Post router :param path: @@ -222,13 +292,12 @@ def post(path): def actual_post(callback): route_paths = path.split('|') for route_path in route_paths: - Router.add(Constant.TINA4_POST, route_path, callback) + Router.add(Constant.TINA4_POST, route_path, callback, middleware) return callback return actual_post - -def put(path): +def put(path, middleware=None): """ Put router :param path: @@ -237,13 +306,13 @@ def put(path): def actual_put(callback): route_paths = path.split('|') for route_path in route_paths: - Router.add(Constant.TINA4_PUT, route_path, callback) + Router.add(Constant.TINA4_PUT, route_path, callback, middleware) return callback return actual_put -def patch(path): +def patch(path, middleware=None): """ Patch router :param path: @@ -252,13 +321,13 @@ def patch(path): def actual_patch(callback): route_paths = path.split('|') for route_path in route_paths: - Router.add(Constant.TINA4_PATCH, route_path, callback) + Router.add(Constant.TINA4_PATCH, route_path, callback, middleware) return callback return actual_patch -def delete(path): +def delete(path, middleware=None): """ Delete router :param path: @@ -267,7 +336,7 @@ def delete(path): def actual_delete(callback): route_paths = path.split('|') for route_path in route_paths: - Router.add(Constant.TINA4_DELETE, route_path, callback) + Router.add(Constant.TINA4_DELETE, route_path, callback, middleware) return callback return actual_delete diff --git a/tina4_python/Swagger.py b/tina4_python/Swagger.py index 1be42b4..6752030 100644 --- a/tina4_python/Swagger.py +++ b/tina4_python/Swagger.py @@ -30,6 +30,14 @@ def add_summary(summary, callback): def add_secure(callback): Swagger.set_swagger_value(callback, "secure", True) + @staticmethod + def add_header_auth(callback): + Swagger.set_swagger_value(callback, "headerauth", True) + + @staticmethod + def add_query_auth(callback): + Swagger.set_swagger_value(callback, "queryauth", True) + @staticmethod def add_tags(tags, callback): Swagger.set_swagger_value(callback, "tags", tags) @@ -62,7 +70,7 @@ def get_path_inputs(route_path): return params @staticmethod - def get_swagger_entry(url, method, tags, summary, description, produces, security, params=None, example=None, + def get_swagger_entry(url, method, tags, summary, description, produces, security, headerauth=None, queryauth=None, params=None, example=None, responses=None): if params is None: @@ -72,9 +80,19 @@ def get_swagger_entry(url, method, tags, summary, description, produces, securit if example is not None: schema = {"type": "object", "example": example} - secure_annotation = [], + secure_annotation = [] + + # If security is defined, add bearerAuth if security: - secure_annotation = [{"bearerAuth": []}]; + secure_annotation.append({"bearerAuth": []}) + + # If we can add API key auth from the header + if headerauth: + secure_annotation.append({"ApiKeyHeader": []}) + + # If we can add API key auth from the query + if queryauth: + secure_annotation.append({"ApiKeyQuery": []}) new_params = [] for param in params: @@ -123,6 +141,10 @@ def parse_swagger(swagger): swagger["example"] = None if not "secure" in swagger: swagger["secure"] = None + if not "headerauth" in swagger: + swagger["headerauth"] = None + if not "queryauth" in swagger: + swagger["queryauth"] = None if isinstance(swagger["tags"], str): swagger["tags"] = [swagger["tags"]] @@ -132,6 +154,7 @@ def parse_swagger(swagger): @staticmethod def get_json(request): paths = {} + apikey_auth = False # If we have any routes that require api key auth for route in tina4_python.tina4_routes.values(): if "swagger" in route: @@ -139,6 +162,16 @@ def get_json(request): swagger = Swagger.parse_swagger(route["swagger"]) produces = {} + header_auth = False + + if swagger["headerauth"]: + header_auth = True + + query_auth = False + + if swagger["queryauth"]: + query_auth = True + responses = { "200": {"description": "Success"}, "400": {"description": "Failed"} @@ -154,6 +187,8 @@ def get_json(request): ["application/json", "html/text"], swagger["secure"], + swagger["headerauth"], + swagger["queryauth"], swagger["params"], swagger["example"], responses) @@ -172,11 +207,40 @@ def get_json(request): "version": os.getenv("SWAGGER_VERSION", "1.0.0(SWAGGER_VERSION)") }, "components": { - "securitySchemes": {"bearerAuth": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}}}, + "securitySchemes": {} + }, "basePath": "", "paths": paths } + # Populate the security schemes + if header_auth: + json_object["components"]["securitySchemes"]["ApiKeyHeader"] = { + "type": "apiKey", + "in": "header", + "name": "X-API-KEY" + } + + if query_auth: + json_object["components"]["securitySchemes"]["ApiKeyQuery"] = { + "type": "apiKey", + "in": "query", + "name": "api-key" + } + + # Now you can set the security requirements for your API paths + json_object["security"] = [] + + # Example of how to apply both security schemes globally or to specific operations + if header_auth and query_auth: + json_object["security"].append({"ApiKeyHeader": [], "ApiKeyQuery": []}) + elif header_auth: + json_object["security"].append({"ApiKeyHeader": []}) + elif query_auth: + json_object["security"].append({"ApiKeyQuery": []}) + + json_object["components"]["securitySchemes"]["bearerAuth"] = {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"} + return json_object @@ -203,6 +267,21 @@ def actual_secure(callback): return actual_secure +# Pass the api key as a header +def headerauth(): + def actual_header_auth(callback): + Swagger.add_header_auth(callback) + return callback + + return actual_header_auth + +# Pass the api key as a query parameter +def queryauth(): + def actual_query_auth(callback): + Swagger.add_query_auth(callback) + return callback + + return actual_query_auth def tags(tags): def actual_tags(callback): diff --git a/tina4_python/__init__.py b/tina4_python/__init__.py index c08f337..7c1e7d5 100644 --- a/tina4_python/__init__.py +++ b/tina4_python/__init__.py @@ -90,6 +90,19 @@ app_file.write('from tina4_python import *') app_file.write('\n') + # Middleware + with open(root_path + os.sep + "src" + os.sep + "Middleware.py", 'w') as middleware_file: + middleware_file.write('# Middleware class to inject values into headers and request') + middleware_file.write('\n') + middleware_file.write('class Middleware:') + middleware_file.write('\n') + middleware_file.write('# def sample_inject(self, request, headers):') + middleware_file.write('\n') + middleware_file.write('# Do something here') + middleware_file.write('\n') + middleware_file.write('# return request, headers') + middleware_file.write('\n') + if not os.path.exists(root_path + os.sep + "src" + os.sep + "app"): os.makedirs(root_path + os.sep + "src" + os.sep + "app")