@@ -288,6 +288,59 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure)
288288}
289289/* }}} */
290290
291+ /* {{{ proto void SimpleKafkaClient\Kafka::setOAuthBearerToken(string $token, int $lifetimeMs, string $principalName, ?array $extensions = null)
292+ Set SASL/OAUTHBEARER token and metadata. */
293+ ZEND_METHOD (SimpleKafkaClient_Kafka , setOAuthBearerToken )
294+ {
295+ zend_long lifetime_ms ;
296+ const char * * extensions = NULL ;
297+ char * header_key , * header_value , * token , * principal_name , * errstr = NULL ;
298+ size_t token_len , principal_name_len , errstr_size = 0 , extension_size = 0 ;
299+ kafka_object * intern ;
300+ rd_kafka_resp_err_t err ;
301+ HashTable * ht_extensions = NULL ;
302+ HashPosition extensionsPos ;
303+ zval * z_header_value ;
304+
305+ ZEND_PARSE_PARAMETERS_START_EX (ZEND_PARSE_PARAMS_THROW , 3 , 4 )
306+ Z_PARAM_STRING (token , token_len )
307+ Z_PARAM_LONG (lifetime_ms )
308+ Z_PARAM_STRING (principal_name , principal_name_len )
309+ Z_PARAM_OPTIONAL
310+ Z_PARAM_ARRAY_HT_OR_NULL (ht_extensions )
311+ ZEND_PARSE_PARAMETERS_END ();
312+
313+ intern = get_kafka_object (getThis ());
314+ if (!intern ) {
315+ return ;
316+ }
317+
318+ if (ht_extensions ) {
319+ for (zend_hash_internal_pointer_reset_ex (ht_extensions , & extensionsPos );
320+ (z_header_value = zend_hash_get_current_data_ex (ht_extensions , & extensionsPos )) != NULL &&
321+ (header_key = kafka_hash_get_current_key_ex (ht_extensions , & extensionsPos )) != NULL ;
322+ zend_hash_move_forward_ex (ht_extensions , & extensionsPos )) {
323+ convert_to_string_ex (z_header_value );
324+ extensions = realloc (extensions , (extension_size + 1 ) * sizeof (header_key ));
325+ extensions [extension_size ] = header_key ;
326+ header_value = Z_STRVAL_P (z_header_value );
327+ extensions = realloc (extensions , (extension_size + 2 ) * sizeof (header_value ));
328+ extensions [extension_size + 1 ] = Z_STRVAL_P (z_header_value );
329+ extension_size += 2 ;
330+ }
331+ }
332+
333+ err = rd_kafka_oauthbearer_set_token (intern -> rk , token , lifetime_ms , principal_name , extensions , extension_size , errstr , errstr_size );
334+
335+ if (err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
336+ zend_throw_exception (ce_kafka_exception , rd_kafka_err2str (err ), err );
337+ return ;
338+ }
339+
340+ free (extensions );
341+ }
342+ /* }}} */
343+
291344#define COPY_CONSTANT (name ) \
292345 REGISTER_LONG_CONSTANT(#name, name, CONST_CS | CONST_PERSISTENT)
293346
0 commit comments