44import hashlib
55import hmac
66import json
7+ import logging
78import string
89
910# needed for AWS_MSK_IAM authentication:
1314 # no botocore available, will disable AWS_MSK_IAM mechanism
1415 BotoSession = None
1516
17+ from kafka .errors import KafkaConfigurationError
1618from kafka .sasl .abc import SaslMechanism
1719from kafka .vendor .six .moves import urllib
1820
1921
22+ log = logging .getLogger (__name__ )
23+
24+
2025class SaslMechanismAwsMskIam (SaslMechanism ):
2126 def __init__ (self , ** config ):
2227 assert BotoSession is not None , 'AWS_MSK_IAM requires the "botocore" package'
@@ -27,22 +32,28 @@ def __init__(self, **config):
2732 self ._is_done = False
2833 self ._is_authenticated = False
2934
30- def auth_bytes (self ):
35+ def _build_client (self ):
3136 session = BotoSession ()
3237 credentials = session .get_credentials ().get_frozen_credentials ()
33- client = AwsMskIamClient (
38+ if not session .get_config_variable ('region' ):
39+ raise KafkaConfigurationError ('Unable to determine region for AWS MSK cluster. Is AWS_DEFAULT_REGION set?' )
40+ return AwsMskIamClient (
3441 host = self .host ,
3542 access_key = credentials .access_key ,
3643 secret_key = credentials .secret_key ,
3744 region = session .get_config_variable ('region' ),
3845 token = credentials .token ,
3946 )
47+
48+ def auth_bytes (self ):
49+ client = self ._build_client ()
50+ log .debug ("Generating auth token for MSK scope: %s" , client ._scope )
4051 return client .first_message ()
4152
4253 def receive (self , auth_bytes ):
4354 self ._is_done = True
4455 self ._is_authenticated = auth_bytes != b''
45- self ._auth = auth_bytes .deode ('utf-8' )
56+ self ._auth = auth_bytes .decode ('utf-8' )
4657
4758 def is_done (self ):
4859 return self ._is_done
0 commit comments