|
| 1 | +from typing import List |
1 | 2 | from typing import Optional |
2 | 3 | from typing import Tuple |
3 | 4 | from typing import Union |
4 | 5 |
|
5 | 6 | from fastapi.security.utils import get_authorization_scheme_param |
6 | | -from starlette.authentication import AuthCredentials |
7 | 7 | from starlette.authentication import AuthenticationBackend |
8 | 8 | from starlette.middleware.authentication import AuthenticationMiddleware |
9 | 9 | from starlette.requests import Request |
|
16 | 16 | from .utils import jwt_decode |
17 | 17 |
|
18 | 18 |
|
| 19 | +class Auth: |
| 20 | + scopes: List[str] |
| 21 | + |
| 22 | + def __init__(self, scopes: Optional[List[str]] = None) -> None: |
| 23 | + self.scopes = scopes or [] |
| 24 | + |
| 25 | + |
| 26 | +class User(dict): |
| 27 | + is_authenticated: bool |
| 28 | + |
| 29 | + def __init__(self, seq: Optional[dict] = None, **kwargs) -> None: |
| 30 | + self.is_authenticated = seq is not None |
| 31 | + super().__init__(seq or {}, **kwargs) |
| 32 | + |
| 33 | + |
19 | 34 | class OAuth2Backend(AuthenticationBackend): |
20 | | - async def authenticate(self, request: Request) -> Optional[Tuple["AuthCredentials", Optional[dict]]]: |
| 35 | + async def authenticate(self, request: Request) -> Optional[Tuple["Auth", "User"]]: |
21 | 36 | authorization = request.cookies.get("Authorization") |
22 | 37 | scheme, param = get_authorization_scheme_param(authorization) |
23 | 38 |
|
24 | 39 | if not scheme or not param: |
25 | | - return AuthCredentials(), None |
| 40 | + return Auth(), User() |
26 | 41 |
|
27 | | - access_token = jwt_decode(param) |
28 | | - scope = access_token.pop("scope") |
29 | | - return AuthCredentials(scope), access_token |
| 42 | + user = jwt_decode(param) |
| 43 | + scopes = user.pop("scope") |
| 44 | + return Auth(scopes), User(user) |
30 | 45 |
|
31 | 46 |
|
32 | 47 | class OAuth2Middleware: |
|
0 commit comments