|
1 | | -# -------------------------------------------------------------------------------------------- |
2 | | -# Copyright (c) Microsoft Corporation. All rights reserved. |
3 | | -# Licensed under the MIT License. See LICENSE in the project root for license information. |
4 | | -# -------------------------------------------------------------------------------------------- |
| 1 | +"""Wrapper to interface with the artifacts authentication helper.""" |
5 | 2 |
|
6 | 3 | from __future__ import absolute_import |
7 | 4 |
|
|
18 | 15 |
|
19 | 16 | @dataclass |
20 | 17 | class Credentials: |
| 18 | + """A set of credentials, consisting of a username and password.""" |
| 19 | + |
21 | 20 | username: str |
22 | 21 | password: str |
23 | 22 |
|
24 | 23 |
|
25 | 24 | class ArtifactsHelperCredentialProviderError(RuntimeError): |
26 | | - pass |
| 25 | + """Generic error for ArtifactsHelperCredentialProvider.""" |
27 | 26 |
|
28 | 27 |
|
29 | 28 | class ArtifactsHelperCredentialProvider: |
| 29 | + """A wrapper retrieve credentials from the artifacts authentication helper. |
| 30 | +
|
| 31 | + The authentication helper should be installed from |
| 32 | + https://github.com/microsoft/ado-codespaces-auth. |
| 33 | +
|
| 34 | + Attributes: |
| 35 | + DEFAULT_AUTH_HELPER_PATH: The default path to the authentication helper |
| 36 | + executable. |
| 37 | +
|
| 38 | + Raises: |
| 39 | + ArtifactsHelperCredentialProviderError: When the credentials could not be |
| 40 | + retrieved. |
| 41 | + """ |
| 42 | + |
30 | 43 | DEFAULT_AUTH_HELPER_PATH = "~/ado-auth-helper" |
31 | 44 |
|
32 | 45 | def __init__( |
33 | 46 | self, |
34 | 47 | auth_helper_path: Union[os.PathLike, str] = DEFAULT_AUTH_HELPER_PATH, |
35 | 48 | timeout: int = 30, |
36 | 49 | ): |
| 50 | + """Initialise the provider. |
| 51 | +
|
| 52 | + Args: |
| 53 | + auth_helper_path: The path to the authentication helper executable, or the |
| 54 | + name of the executable if it is in the PATH. Defaults to |
| 55 | + DEFAULT_AUTH_HELPER_PATH. |
| 56 | +
|
| 57 | + timeout: The timeout in seconds for calling the authentication helper and |
| 58 | + any HTTP requests made to test credentials. Defaults to 30. |
| 59 | + """ |
37 | 60 | self.auth_tool_path = self.resolve_auth_helper_path(auth_helper_path) |
38 | 61 | self.timeout = timeout |
39 | 62 |
|
40 | 63 | @staticmethod |
41 | 64 | def resolve_auth_helper_path( |
42 | 65 | auth_helper_path: Union[os.PathLike, str], |
43 | 66 | ) -> Optional[str]: |
| 67 | + """Resolve the path to the authentication helper executable. |
| 68 | +
|
| 69 | + Returns: |
| 70 | + The path to the authentication helper executable, or `None` if it is not |
| 71 | + executable or not found. |
| 72 | + """ |
44 | 73 | return shutil.which(str(Path(auth_helper_path).expanduser()), mode=os.X_OK) |
45 | 74 |
|
46 | 75 | @classmethod |
47 | 76 | def auth_helper_installed(cls, auth_helper_path: Union[os.PathLike, str]) -> bool: |
| 77 | + """Check whether the authentication helper is installed and executable.""" |
48 | 78 | return cls.resolve_auth_helper_path(auth_helper_path) is not None |
49 | 79 |
|
50 | | - def get_credentials(self, url) -> Optional[Credentials]: |
51 | | - # Public feed short circuit: return nothing if not getting credentials for the upload endpoint |
52 | | - # (which always requires auth) and the endpoint is public (can authenticate without credentials). |
| 80 | + def get_credentials(self, url: str) -> Optional[Credentials]: |
| 81 | + """Get credentials for the given URL. |
| 82 | +
|
| 83 | + Args: |
| 84 | + url: The URL to retrieve credentials for. |
| 85 | +
|
| 86 | + Returns: |
| 87 | + The credentials for the URL, or `None` if no credentials could be retrieved. |
| 88 | + """ |
| 89 | + # Public feed short circuit: return nothing if not getting credentials for the |
| 90 | + # upload endpoint (which always requires auth) and the endpoint is public (can |
| 91 | + # authenticate without credentials). |
53 | 92 | if not self._is_upload_endpoint(url) and self._can_authenticate(url, None): |
54 | 93 | return None |
55 | 94 |
|
@@ -87,16 +126,19 @@ def _get_jwt_from_helper(self) -> str: |
87 | 126 | return stdout.strip() |
88 | 127 | else: |
89 | 128 | raise ArtifactsHelperCredentialProviderError( |
90 | | - f"Failed to get credentials: No output from subprocess {self.auth_tool_path}" |
| 129 | + "Failed to get credentials: " |
| 130 | + f"No output from subprocess {self.auth_tool_path}" |
91 | 131 | ) |
92 | 132 |
|
93 | 133 | except subprocess.CalledProcessError as e: |
94 | 134 | raise ArtifactsHelperCredentialProviderError( |
95 | | - f"Failed to get credentials: Process {self.auth_tool_path} exited with code {e.returncode}. Error: {e.stderr}" |
| 135 | + f"Failed to get credentials: Process {self.auth_tool_path} exited with " |
| 136 | + f"code {e.returncode}. Error: {e.stderr}" |
96 | 137 | ) from e |
97 | 138 | except subprocess.TimeoutExpired as e: |
98 | 139 | raise ArtifactsHelperCredentialProviderError( |
99 | | - f"Failed to get credentials: Process {self.auth_tool_path} timed out after {self.timeout} seconds" |
| 140 | + f"Failed to get credentials: Process {self.auth_tool_path} timed out " |
| 141 | + f"after {self.timeout} seconds" |
100 | 142 | ) from e |
101 | 143 |
|
102 | 144 | def _get_credentials_from_jwt(self, jwt_str: str) -> Credentials: |
|
0 commit comments