|
2 | 2 |
|
3 | 3 | from gogs_client._implementation.http_utils import RelativeHttpRequestor, append_url |
4 | 4 | from gogs_client.entities import GogsUser, GogsRepo |
| 5 | +from gogs_client.auth import Token |
5 | 6 |
|
6 | 7 |
|
7 | 8 | class GogsApi(object): |
@@ -43,6 +44,68 @@ def authenticated_user(self, auth): |
43 | 44 | response = self._get("/user", auth=auth) |
44 | 45 | return GogsUser.from_json(self._check_ok(response).json()) |
45 | 46 |
|
| 47 | + def get_tokens(self, auth, username=None): |
| 48 | + """ |
| 49 | + Returns tokens defined for specified user. |
| 50 | + If no user specified uses user authenticated by the given authentication. |
| 51 | + Right now, authentication must be UsernamePassword (not Token). |
| 52 | +
|
| 53 | + :param auth.Authentication auth: authentication for user to retrieve |
| 54 | + :param str username: username of owner of tokens |
| 55 | +
|
| 56 | + :return: list of token representation |
| 57 | + :rtype: List[Token] |
| 58 | + :raises NetworkFailure: if there is an error communicating with the server |
| 59 | + :raises ApiFailure: if the request cannot be serviced |
| 60 | + """ |
| 61 | + if username is None: |
| 62 | + username = self.authenticated_user(auth).username |
| 63 | + response = self._get("/users/{u}/tokens".format(u=username), auth=auth) |
| 64 | + return [Token.from_json(o) for o in self._check_ok(response).json()] |
| 65 | + |
| 66 | + def create_token(self, auth, name, username=None): |
| 67 | + """ |
| 68 | + Creates new token with specified name for specified user. |
| 69 | + If no user specified uses user authenticated by the given authentication. |
| 70 | + Right now, authentication must be UsernamePassword (not Token). |
| 71 | +
|
| 72 | + :param auth.Authentication auth: authentication for user to retrieve |
| 73 | + :param str name: name of new token |
| 74 | + :param str username: username of owner of new token |
| 75 | +
|
| 76 | + :return: new token representation |
| 77 | + :rtype: Token |
| 78 | + :raises NetworkFailure: if there is an error communicating with the server |
| 79 | + :raises ApiFailure: if the request cannot be serviced |
| 80 | + """ |
| 81 | + if username is None: |
| 82 | + username = self.authenticated_user(auth).username |
| 83 | + data = {"name": name} |
| 84 | + response = self._post("/users/{u}/tokens".format(u=username), auth=auth, data=data) |
| 85 | + return Token.from_json(self._check_ok(response).json()) |
| 86 | + |
| 87 | + def ensure_token(self, auth, name, username=None): |
| 88 | + """ |
| 89 | + Creates new token if token with specified name for specified user does not exists. |
| 90 | + If no user specified uses user authenticated by the given authentication. |
| 91 | + Right now, authentication must be UsernamePassword (not Token). |
| 92 | +
|
| 93 | + :param auth.Authentication auth: authentication for user to retrieve |
| 94 | + :param str name: name of new token |
| 95 | + :param str username: username of owner of new token |
| 96 | +
|
| 97 | + :return: token representation |
| 98 | + :rtype: Token |
| 99 | + :raises NetworkFailure: if there is an error communicating with the server |
| 100 | + :raises ApiFailure: if the request cannot be serviced |
| 101 | + """ |
| 102 | + if username is None: |
| 103 | + username = self.authenticated_user(auth).username |
| 104 | + tokens = [token for token in self.get_tokens(auth, username) if token.name == name] |
| 105 | + if tokens: |
| 106 | + return tokens[0] |
| 107 | + return self.create_token(auth, name, username) |
| 108 | + |
46 | 109 | def create_repo(self, auth, name, description=None, private=False, auto_init=False, |
47 | 110 | gitignore_templates=None, license_template=None, readme_template=None): |
48 | 111 | """ |
|
0 commit comments