1818
1919import json
2020import re
21+ import warnings
22+ from typing import Dict , List
2123
22- from splunklib import binding
24+ from splunklib import binding , client
2325
2426from . import splunk_rest_client as rest_client
2527from .net_utils import validate_scheme_host_port
@@ -79,7 +81,7 @@ def __init__(
7981 port : int = None ,
8082 ** context : dict ,
8183 ):
82- """Initializes CredentialsManager .
84+ """Initializes CredentialManager .
8385
8486 Arguments:
8587 session_key: Splunk access token.
@@ -123,9 +125,11 @@ def get_password(self, user: str) -> str:
123125 realm='realm_test')
124126 >>> cm.get_password('testuser2')
125127 """
126-
127- all_passwords = self ._get_all_passwords ()
128- for password in all_passwords :
128+ if self ._realm is not None :
129+ passwords = self .get_clear_passwords_in_realm ()
130+ else :
131+ passwords = self .get_clear_passwords ()
132+ for password in passwords :
129133 if password ["username" ] == user and password ["realm" ] == self ._realm :
130134 return password ["clear_password" ]
131135
@@ -182,14 +186,16 @@ def _update_password(self, user: str, password: str):
182186 self ._storage_passwords .create (password , user , self ._realm )
183187 except binding .HTTPError as ex :
184188 if ex .status == 409 :
185- all_passwords = self ._get_all_passwords_in_realm ()
186- for pwd_stanza in all_passwords :
189+ if self ._realm is not None :
190+ passwords = self .get_raw_passwords_in_realm ()
191+ else :
192+ passwords = self .get_raw_passwords ()
193+ for pwd_stanza in passwords :
187194 if pwd_stanza .realm == self ._realm and pwd_stanza .username == user :
188195 pwd_stanza .update (password = password )
189196 return
190197 raise ValueError (
191- "Can not get the password object for realm: %s user: %s"
192- % (self ._realm , user )
198+ f"Can not get the password object for realm: { self ._realm } user: { user } "
193199 )
194200 else :
195201 raise ex
@@ -211,25 +217,61 @@ def delete_password(self, user: str):
211217 realm='realm_test')
212218 >>> cm.delete_password('testuser1')
213219 """
214- all_passwords = self ._get_all_passwords_in_realm ()
220+ if self ._realm is not None :
221+ passwords = self .get_raw_passwords_in_realm ()
222+ else :
223+ passwords = self .get_raw_passwords ()
215224 deleted = False
216225 ent_pattern = re .compile (
217226 r"({}{}\d+)" .format (user .replace ("\\ " , "\\ \\ " ), self .SEP )
218227 )
219- for password in list ( all_passwords ) :
228+ for password in passwords :
220229 match = (user == password .username ) or ent_pattern .match (password .username )
221230 if match and password .realm == self ._realm :
222231 password .delete ()
223232 deleted = True
224233
225234 if not deleted :
226235 raise CredentialNotExistException (
227- "Failed to delete password of realm={}, user={}" .format (
228- self ._realm , user
229- )
236+ f"Failed to delete password of realm={ self ._realm } , user={ user } "
230237 )
231238
232- def _get_all_passwords_in_realm (self ):
239+ def get_raw_passwords (self ) -> List [client .StoragePassword ]:
240+ """Returns all passwords in the "raw" format."""
241+ warnings .warn (
242+ "Please pass realm to the CredentialManager, "
243+ "so it can utilize get_raw_passwords_in_realm method instead."
244+ )
245+ return self ._storage_passwords .list (count = - 1 )
246+
247+ def get_raw_passwords_in_realm (self ) -> List [client .StoragePassword ]:
248+ """Returns all passwords within the realm in the "raw" format."""
249+ if self ._realm is None :
250+ raise ValueError ("No realm was specified" )
251+ return self ._storage_passwords .list (count = - 1 , search = f"realm={ self ._realm } " )
252+
253+ def get_clear_passwords (self ) -> List [Dict [str , str ]]:
254+ """Returns all passwords in the "clear" format."""
255+ warnings .warn (
256+ "Please pass realm to the CredentialManager, "
257+ "so it can utilize get_clear_passwords_in_realm method instead."
258+ )
259+ raw_passwords = self .get_raw_passwords ()
260+ return self ._get_clear_passwords (raw_passwords )
261+
262+ def get_clear_passwords_in_realm (self ) -> List [Dict [str , str ]]:
263+ """Returns all passwords within the realm in the "clear" format."""
264+ if self ._realm is None :
265+ raise ValueError ("No realm was specified" )
266+ raw_passwords = self .get_raw_passwords_in_realm ()
267+ return self ._get_clear_passwords (raw_passwords )
268+
269+ def _get_all_passwords_in_realm (self ) -> List [client .StoragePassword ]:
270+ warnings .warn (
271+ "_get_all_passwords_in_realm is deprecated, "
272+ "please use get_raw_passwords_in_realm instead." ,
273+ stacklevel = 2 ,
274+ )
233275 if self ._realm :
234276 all_passwords = self ._storage_passwords .list (
235277 count = - 1 , search = f"realm={ self ._realm } "
@@ -238,13 +280,12 @@ def _get_all_passwords_in_realm(self):
238280 all_passwords = self ._storage_passwords .list (count = - 1 , search = "" )
239281 return all_passwords
240282
241- @retry (exceptions = [binding .HTTPError ])
242- def _get_all_passwords (self ):
243- all_passwords = self ._storage_passwords .list (count = - 1 )
244-
283+ def _get_clear_passwords (
284+ self , passwords : List [client .StoragePassword ]
285+ ) -> List [Dict [str , str ]]:
245286 results = {}
246287 ptn = re .compile (rf"(.+){ self .SEP } (\d+)" )
247- for password in all_passwords :
288+ for password in passwords :
248289 match = ptn .match (password .name )
249290 if match :
250291 actual_name = match .group (1 ) + ":"
@@ -263,7 +304,7 @@ def _get_all_passwords(self):
263304
264305 # Backward compatibility
265306 # To deal with the password with only one stanza which is generated by the old version.
266- for password in all_passwords :
307+ for password in passwords :
267308 match = ptn .match (password .name )
268309 if (not match ) and (password .name not in results ):
269310 results [password .name ] = {
@@ -289,6 +330,16 @@ def _get_all_passwords(self):
289330
290331 return list (results .values ())
291332
333+ @retry (exceptions = [binding .HTTPError ])
334+ def _get_all_passwords (self ) -> List [Dict [str , str ]]:
335+ warnings .warn (
336+ "_get_all_passwords is deprecated, "
337+ "please use get_all_passwords_in_realm instead." ,
338+ stacklevel = 2 ,
339+ )
340+ passwords = self ._storage_passwords .list (count = - 1 )
341+ return self ._get_clear_passwords (passwords )
342+
292343
293344@retry (exceptions = [binding .HTTPError ])
294345def get_session_key (
@@ -317,7 +368,7 @@ def get_session_key(
317368 ValueError: if scheme, host or port are invalid.
318369
319370 Examples:
320- >>> credentials. get_session_key('user', 'password')
371+ >>> get_session_key('user', 'password')
321372 """
322373 validate_scheme_host_port (scheme , host , port )
323374
0 commit comments