Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,42 @@ Flake8 Code tests
```
poetry run flake8 ./tina4_python
```

### Swagger Authentication Methods & Middleware

#### Authentication Methods

Support is added for Token (Bearer) authentication by default

If there is a need to pass the token in the header or query parameters then you can add the following decorators to your route

```
@headerauth()
```
You then pass it as a header value. The key needs to be X-API-KEY

```
@queryauth()
```
You then pass it as a query parameter. The parameter needs to be api-key

#### Middleware

You can execute middleware when a route is called. Here you can authenticate a user before the default authentication or inject values into the header or request

There is a file called Middleware.py in the src directory. Add all your middleware here

To authenticate a user simply add the following to the request

```
request["validated"] = True
```

To run middleware simply add the function name to the @post, @get, @put etc decorator

```
@post('/api/user/authenticate', 'authenticate_user')
```
Then add a authenticate_user function to Middleware.py and do any checks there


8 changes: 7 additions & 1 deletion tina4_python/Auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,11 @@ def validate(self, token):

return False

def valid(self, token):
def valid(self, token, override_method = None):
if override_method is not None:
try:
return override_method(token)
except Exception:
return False

return self.validate(token)
139 changes: 104 additions & 35 deletions tina4_python/Router.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,81 @@ async def get_result(url, method, request, headers, session):
content = Template.render_twig_template(
"errors/403.twig", {"server": {"url": url}})

current_route = None
validated = False

# Get all the route parameters
for route in tina4_python.tina4_routes.values():
if route["method"] != method:
continue
Debug("Matching route " + route['route'] + " to " + url, Constant.TINA4_LOG_DEBUG)
if Router.match(url, route['route']):
Debug("Route matched: " + route['route'], Constant.TINA4_LOG_DEBUG)
current_route = route
exit

# If the route is not found
if current_route is None:
return Response(content, Constant.HTTP_NOT_FOUND, Constant.TEXT_HTML)

# if we need to execute middleware
if "middleware" in current_route:
if current_route["middleware"] is not None:
middleware = current_route["middleware"]
Debug("Middleware found: " + middleware, Constant.TINA4_LOG_DEBUG)

try:
import importlib

module = importlib.import_module("src.Middleware")

# Get the Middleware class from the module
middleware_class = getattr(module, 'Middleware')

# Create an instance of Middleware
middleware_instance = middleware_class()

# Get the middleware function
middleware_function = getattr(middleware_instance, middleware)

if callable(middleware_function):
# Execute the middleware - We can pass additional parameters in the request
[request, headers] = middleware_function(request, headers)
Debug("Middleware executed", Constant.TINA4_LOG_DEBUG)
else:
Debug("Middleware function is not callable", Constant.TINA4_LOG_DEBUG)
except (AttributeError, ImportError) as e:
Debug(f"Error: {str(e)}", Constant.TINA4_LOG_DEBUG)

# If the middleware has validated the user then we can carry on
if "validated" in request:
validated = request["validated"]

# check to see if we have an auth ability
if "authorization" in headers:
token = headers["authorization"].replace("Bearer", "").strip()
if tina4_python.tina4_auth.valid(token):
validated = True
if "Bearer" in headers["authorization"]:
token = headers["authorization"].replace("Bearer", "").strip()
if tina4_python.tina4_auth.valid(token):
validated = True

Debug(current_route, Constant.TINA4_LOG_DEBUG)

# check if we can authorize with an API key in the header
if current_route["swagger"] is not None:
if "headerauth" in current_route["swagger"]:
if current_route["swagger"]["headerauth"]:
if "x-api-key" in headers:
token = headers["x-api-key"].strip()
if tina4_python.tina4_auth.valid(token):
validated = True

# check if we can authorize with an API key in the query string
if "queryauth" in current_route["swagger"]:
if current_route["swagger"]["queryauth"]:
if "api-key" in request["params"]:
token = request["params"]["api-key"]
if tina4_python.tina4_auth.valid(token):
validated = True

if request["params"] is not None and "formToken" in request["params"]:
token = request["params"]["formToken"]
Expand Down Expand Up @@ -113,31 +182,28 @@ async def get_result(url, method, request, headers, session):
with open(static_file, 'rb') as file:
return Response(file.read(), Constant.HTTP_OK, mime_type)

for route in tina4_python.tina4_routes.values():
if route["method"] != method:
continue
Debug("Matching route " + route['route'] + " to " + url, Constant.TINA4_LOG_DEBUG)
if Router.match(url, route['route']):
if "swagger" in route and route["swagger"] is not None and "secure" in route["swagger"]:
if route["swagger"]["secure"] and not validated:
# check if we have a current route
if current_route is not None:
if "swagger" in current_route and current_route["swagger"] is not None and "secure" in current_route["swagger"]:
if current_route["swagger"]["secure"] and not validated:
if not validated:
return Response(content, Constant.HTTP_FORBIDDEN, Constant.TEXT_HTML)

router_response = route["callback"]
router_response = current_route["callback"]

# Add the inline variables & construct a Request variable
request["params"].update(Router.variables)
# Add the inline variables & construct a Request variable
request["params"].update(Router.variables)

Request.request = request # Add the request object
Request.headers = headers # Add the headers
Request.params = request["params"]
Request.body = request["body"] if "body" in request else None
Request.session = session
Request.request = request # Add the request object
Request.headers = headers # Add the headers
Request.params = request["params"]
Request.body = request["body"] if "body" in request else None
Request.session = session

tina4_python.tina4_current_request = Request
old_stdout = sys.stdout # Memorize the default stdout stream
sys.stdout = buffer = io.StringIO()
result = await router_response(request=Request, response=Response)
break
tina4_python.tina4_current_request = Request
old_stdout = sys.stdout # Memorize the default stdout stream
sys.stdout = buffer = io.StringIO()
result = await router_response(request=Request, response=Response)

if result is None:
sys.stdout = old_stdout
Expand Down Expand Up @@ -183,7 +249,7 @@ def clean_url(url):

# adds a route to the router
@staticmethod
def add(method, route, callback):
def add(method, route, callback, middleware=None):
Debug("Adding a route: " + route, Constant.TINA4_LOG_DEBUG)
if not callback in tina4_python.tina4_routes:
tina4_python.tina4_routes[callback] = {"route": route, "callback": callback, "method": method, "swagger": None}
Expand All @@ -192,13 +258,17 @@ def add(method, route, callback):
tina4_python.tina4_routes[callback]["callback"] = callback
tina4_python.tina4_routes[callback]["method"] = method

if middleware is not None:
tina4_python.tina4_routes[callback]["middleware"] = middleware
Debug("Adding Middleware " + middleware, Constant.TINA4_LOG_DEBUG)

if '{' in route: # store the parameters if needed
route_variables = re.findall(r'{(.*?)}', route)
tina4_python.tina4_routes[callback]["params"] = route_variables



def get(path: str):
def get(path: str, middleware=None):
"""
Get router
:param arguments:
Expand All @@ -207,13 +277,13 @@ def get(path: str):
def actual_get(callback):
route_paths = path.split('|')
for route_path in route_paths:
Router.add(Constant.TINA4_GET, route_path, callback)
Router.add(Constant.TINA4_GET, route_path, callback, middleware)
return callback

return actual_get


def post(path):
def post(path, middleware=None):
"""
Post router
:param path:
Expand All @@ -222,13 +292,12 @@ def post(path):
def actual_post(callback):
route_paths = path.split('|')
for route_path in route_paths:
Router.add(Constant.TINA4_POST, route_path, callback)
Router.add(Constant.TINA4_POST, route_path, callback, middleware)
return callback

return actual_post


def put(path):
def put(path, middleware=None):
"""
Put router
:param path:
Expand All @@ -237,13 +306,13 @@ def put(path):
def actual_put(callback):
route_paths = path.split('|')
for route_path in route_paths:
Router.add(Constant.TINA4_PUT, route_path, callback)
Router.add(Constant.TINA4_PUT, route_path, callback, middleware)
return callback

return actual_put


def patch(path):
def patch(path, middleware=None):
"""
Patch router
:param path:
Expand All @@ -252,13 +321,13 @@ def patch(path):
def actual_patch(callback):
route_paths = path.split('|')
for route_path in route_paths:
Router.add(Constant.TINA4_PATCH, route_path, callback)
Router.add(Constant.TINA4_PATCH, route_path, callback, middleware)
return callback

return actual_patch


def delete(path):
def delete(path, middleware=None):
"""
Delete router
:param path:
Expand All @@ -267,7 +336,7 @@ def delete(path):
def actual_delete(callback):
route_paths = path.split('|')
for route_path in route_paths:
Router.add(Constant.TINA4_DELETE, route_path, callback)
Router.add(Constant.TINA4_DELETE, route_path, callback, middleware)
return callback

return actual_delete
Loading