@@ -251,28 +251,6 @@ def _make_acls_result(f, futmap):
251251 for resource , fut in futmap .items ():
252252 fut .set_exception (e )
253253
254- @staticmethod
255- def _make_user_scram_credentials_result (f , futmap ):
256- try :
257- results = f .result ()
258- len_results = len (results )
259- len_futures = len (futmap )
260- if len (results ) != len_futures :
261- raise RuntimeError (
262- f"Results length { len_results } is different from future-map length { len_futures } " )
263- for username , value in results .items ():
264- fut = futmap .get (username , None )
265- if fut is None :
266- raise RuntimeError (
267- f"username { username } not found in future-map: { futmap } " )
268- if isinstance (value , KafkaError ):
269- fut .set_exception (KafkaException (value ))
270- else :
271- fut .set_result (value )
272- except Exception as e :
273- for _ , fut in futmap .items ():
274- fut .set_exception (e )
275-
276254 @staticmethod
277255 def _make_futmap_result_from_list (f , futmap ):
278256 try :
@@ -366,6 +344,30 @@ def _make_futures_v2(futmap_keys, class_check, make_result_fn):
366344
367345 return f , futmap
368346
347+ @staticmethod
348+ def _make_single_future_pair ():
349+ """
350+ Create an pair of futures, one for internal usage and one
351+ to use externally, the external one throws a KafkaException if
352+ any of the values in the map returned by the first future is
353+ a KafkaError.
354+ """
355+ def single_future_result (internal_f , f ):
356+ try :
357+ results = internal_f .result ()
358+ for _ , value in results .items ():
359+ if isinstance (value , KafkaError ):
360+ f .set_exception (KafkaException (value ))
361+ return
362+ f .set_result (results )
363+ except Exception as e :
364+ f .set_exception (e )
365+
366+ f = AdminClient ._create_future ()
367+ internal_f = AdminClient ._create_future ()
368+ internal_f .add_done_callback (lambda internal_f : single_future_result (internal_f , f ))
369+ return internal_f , f
370+
369371 @staticmethod
370372 def _has_duplicates (items ):
371373 return len (set (items )) != len (items )
@@ -449,9 +451,13 @@ def _check_alter_consumer_group_offsets_request(request):
449451
450452 @staticmethod
451453 def _check_describe_user_scram_credentials_request (users ):
454+ if users is None :
455+ return
452456 if not isinstance (users , list ):
453457 raise TypeError ("Expected input to be list of String" )
454458 for user in users :
459+ if user is None :
460+ raise TypeError ("'user' cannot be None" )
455461 if not isinstance (user , string_type ):
456462 raise TypeError ("Each value should be a string" )
457463 if not user :
@@ -1094,34 +1100,41 @@ def set_sasl_credentials(self, username, password):
10941100 """
10951101 super (AdminClient , self ).set_sasl_credentials (username , password )
10961102
1097- def describe_user_scram_credentials (self , users , ** kwargs ):
1103+ def describe_user_scram_credentials (self , users = None , ** kwargs ):
10981104 """
10991105 Describe user SASL/SCRAM credentials.
11001106
11011107 :param list(str) users: List of user names to describe.
1102- Duplicate users aren't allowed.
1108+ Duplicate users aren't allowed. Can be None
1109+ to describe all user's credentials.
11031110 :param float request_timeout: The overall request timeout in seconds,
11041111 including broker lookup, request transmission, operation time
11051112 on broker, and response. Default: `socket.timeout.ms*1000.0`
11061113
1107- :returns: A dict of futures keyed by user name.
1108- The future result() method returns the
1109- :class:`UserScramCredentialsDescription` or
1110- raises KafkaException
1114+ :returns: In case None is passed it returns a single future.
1115+ The future yields a dict[str, UserScramCredentialsDescription]
1116+ or raises a KafkaException
11111117
1112- :rtype: dict[str, future]
1118+ In case a list of user names is passed, it returns
1119+ a dict[str, future[UserScramCredentialsDescription]].
1120+ The futures yield a :class:`UserScramCredentialsDescription`
1121+ or raise a KafkaException
1122+
1123+ :rtype: Union[future[dict[str, UserScramCredentialsDescription]],
1124+ dict[str, future[UserScramCredentialsDescription]]]
11131125
11141126 :raises TypeError: Invalid input type.
11151127 :raises ValueError: Invalid input value.
11161128 """
11171129 AdminClient ._check_describe_user_scram_credentials_request (users )
11181130
1119- f , futmap = AdminClient ._make_futures_v2 (users , None ,
1120- AdminClient ._make_user_scram_credentials_result )
1121-
1122- super (AdminClient , self ).describe_user_scram_credentials (users , f , ** kwargs )
1123-
1124- return futmap
1131+ if users is None :
1132+ internal_f , ret_fut = AdminClient ._make_single_future_pair ()
1133+ else :
1134+ internal_f , ret_fut = AdminClient ._make_futures_v2 (users , None ,
1135+ AdminClient ._make_futmap_result )
1136+ super (AdminClient , self ).describe_user_scram_credentials (users , internal_f , ** kwargs )
1137+ return ret_fut
11251138
11261139 def alter_user_scram_credentials (self , alterations , ** kwargs ):
11271140 """
@@ -1146,7 +1159,7 @@ def alter_user_scram_credentials(self, alterations, **kwargs):
11461159 AdminClient ._check_alter_user_scram_credentials_request (alterations )
11471160
11481161 f , futmap = AdminClient ._make_futures_v2 (set ([alteration .user for alteration in alterations ]), None ,
1149- AdminClient ._make_user_scram_credentials_result )
1162+ AdminClient ._make_futmap_result )
11501163
11511164 super (AdminClient , self ).alter_user_scram_credentials (alterations , f , ** kwargs )
11521165 return futmap
0 commit comments