@@ -168,12 +168,12 @@ const CompatibilityErrorMessages = Object.freeze({
168168 brokerString : ( ) =>
169169 "The 'brokers' property must be an array of strings.\n" +
170170 "For example: ['kafka:9092', 'kafka2:9093']\n" ,
171- saslOauthbearerUnsupported : ( ) =>
172- "SASL mechanism OAUTHBEARER is not supported yet." ,
173171 saslUnsupportedMechanism : ( mechanism ) =>
174172 `SASL mechanism ${ mechanism } is not supported.` ,
175173 saslUsernamePasswordString : ( mechanism ) =>
176174 `The 'sasl.username' and 'sasl.password' properties must be strings and must be present for the mechanism ${ mechanism } .` ,
175+ saslOauthBearerProvider : ( ) =>
176+ `The 'oauthBearerProvider' property must be a function.` ,
177177 sslObject : ( ) =>
178178 "The 'ssl' property must be a boolean. Any additional configuration must be provided outside the kafkaJS block.\n" +
179179 "Before: \n" +
@@ -280,27 +280,65 @@ function kafkaJSToRdKafkaConfig(config) {
280280 const mechanism = sasl . mechanism . toUpperCase ( ) ;
281281
282282 if ( mechanism === 'OAUTHBEARER' ) {
283- throw new error . KafkaJSError ( CompatibilityErrorMessages . saslOauthbearerUnsupported ( ) , {
284- code : error . ErrorCodes . ERR__NOT_IMPLEMENTED ,
285- } ) ;
286- }
287-
288- /* The mechanism must be PLAIN or SCRAM. */
289- if ( mechanism !== 'PLAIN' && ! mechanism . startsWith ( 'SCRAM' ) ) {
283+ rdkafkaConfig [ "sasl.mechanism" ] = mechanism ;
284+ if ( Object . hasOwn ( sasl , "oauthBearerProvider" ) ) {
285+ if ( typeof sasl . oauthBearerProvider !== 'function' ) {
286+ throw new error . KafkaJSError ( CompatibilityErrorMessages . saslOauthBearerProvider ( ) , {
287+ code : error . ErrorCodes . ERR__INVALID_ARG ,
288+ } ) ;
289+ }
290+ rdkafkaConfig [ 'oauthbearer_token_refresh_cb' ] = function ( oauthbearer_config ) {
291+ return sasl . oauthBearerProvider ( oauthbearer_config )
292+ . then ( ( token ) => {
293+ if ( ! Object . hasOwn ( token , 'value' ) ) {
294+ throw new error . KafkaJSError ( 'Token must have a value property.' , {
295+ code : error . ErrorCodes . ERR__INVALID_ARG ,
296+ } ) ;
297+ } else if ( ! Object . hasOwn ( token , 'principal' ) ) {
298+ throw new error . KafkaJSError ( 'Token must have a principal property.' , {
299+ code : error . ErrorCodes . ERR__INVALID_ARG ,
300+ } ) ;
301+ } else if ( ! Object . hasOwn ( token , 'lifetime' ) ) {
302+ throw new error . KafkaJSError ( 'Token must have a lifetime property.' , {
303+ code : error . ErrorCodes . ERR__INVALID_ARG ,
304+ } ) ;
305+ }
306+
307+ // Recast token into a value expected by node-rdkafka's callback.
308+ const setToken = {
309+ tokenValue : token . value ,
310+ extensions : token . extensions ,
311+ principal : token . principal ,
312+ lifetime : token . lifetime ,
313+ } ;
314+ return setToken ;
315+ } )
316+ . catch ( err => {
317+ if ( ! ( err instanceof Error ) ) {
318+ err = new Error ( err ) ;
319+ }
320+ throw err ;
321+ } ) ;
322+ }
323+ }
324+ /* It's a valid case (unlike in KafkaJS) for oauthBearerProvider to be
325+ * null, because librdkafka provides an unsecured token provider for
326+ * non-prod usecases. So don't do anything in that case. */
327+ } else if ( mechanism === 'PLAIN' || mechanism . startsWith ( 'SCRAM' ) ) {
328+ if ( typeof sasl . username !== "string" || typeof sasl . password !== "string" ) {
329+ throw new error . KafkaJSError ( CompatibilityErrorMessages . saslUsernamePasswordString ( mechanism ) , {
330+ code : error . ErrorCodes . ERR__INVALID_ARG ,
331+ } ) ;
332+ }
333+ rdkafkaConfig [ "sasl.mechanism" ] = mechanism ;
334+ rdkafkaConfig [ "sasl.username" ] = sasl . username ;
335+ rdkafkaConfig [ "sasl.password" ] = sasl . password ;
336+ } else {
290337 throw new error . KafkaJSError ( CompatibilityErrorMessages . saslUnsupportedMechanism ( mechanism ) , {
291338 code : error . ErrorCodes . ERR__INVALID_ARG ,
292339 } ) ;
293340 }
294341
295- if ( typeof sasl . username !== "string" || typeof sasl . password !== "string" ) {
296- throw new error . KafkaJSError ( CompatibilityErrorMessages . saslUsernamePasswordString ( mechanism ) , {
297- code : error . ErrorCodes . ERR__INVALID_ARG ,
298- } ) ;
299- }
300-
301- rdkafkaConfig [ "sasl.mechanism" ] = mechanism ;
302- rdkafkaConfig [ "sasl.username" ] = sasl . username ;
303- rdkafkaConfig [ "sasl.password" ] = sasl . password ;
304342 withSASL = true ;
305343 }
306344
0 commit comments