@@ -68,6 +68,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs) /* {{{ */
6868 cbs -> offset_commit = NULL ;
6969 kafka_conf_callback_dtor (cbs -> log );
7070 cbs -> log = NULL ;
71+ kafka_conf_callback_dtor (cbs -> oauthbearer_refresh );
72+ cbs -> oauthbearer_refresh = NULL ;
7173} /* }}} */
7274
7375static void kafka_conf_callback_copy (kafka_conf_callback * * to , kafka_conf_callback * from ) /* {{{ */
@@ -87,6 +89,7 @@ void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *f
8789 kafka_conf_callback_copy (& to -> stats , from -> stats );
8890 kafka_conf_callback_copy (& to -> offset_commit , from -> offset_commit );
8991 kafka_conf_callback_copy (& to -> log , from -> log );
92+ kafka_conf_callback_copy (& to -> oauthbearer_refresh , from -> oauthbearer_refresh );
9093} /* }}} */
9194
9295static void kafka_conf_free (zend_object * object ) /* {{{ */
@@ -304,6 +307,33 @@ static void kafka_conf_log_cb(const rd_kafka_t *rk, int level, const char *facil
304307 zval_ptr_dtor (& args [3 ]);
305308}
306309
310+ static void kafka_conf_oauthbearer_token_refresh_cb (rd_kafka_t * rk , const char * oauthbearer_config , void * opaque )
311+ {
312+ kafka_conf_callbacks * cbs = (kafka_conf_callbacks * ) opaque ;
313+ zval args [2 ];
314+
315+ if (!opaque ) {
316+ return ;
317+ }
318+
319+ if (!cbs -> oauthbearer_refresh ) {
320+ return ;
321+ }
322+
323+ ZVAL_NULL (& args [0 ]);
324+ ZVAL_NULL (& args [1 ]);
325+
326+ ZVAL_ZVAL (& args [0 ], & cbs -> zrk , 1 , 0 );
327+ if (oauthbearer_config ) {
328+ ZVAL_STRING (& args [1 ], oauthbearer_config );
329+ }
330+
331+ kafka_call_function (& cbs -> oauthbearer_refresh -> fci , & cbs -> oauthbearer_refresh -> fcc , NULL , 2 , args );
332+
333+ zval_ptr_dtor (& args [0 ]);
334+ zval_ptr_dtor (& args [1 ]);
335+ }
336+
307337/* {{{ proto SimpleKafkaClient\Configuration::__construct() */
308338ZEND_METHOD (SimpleKafkaClient_Configuration , __construct )
309339{
@@ -579,6 +609,38 @@ ZEND_METHOD(SimpleKafkaClient_Configuration, setLogCb)
579609}
580610/* }}} */
581611
612+ /* {{{ proto void SimpleKafkaClient\Configuration::setOAuthBearerTokenRefreshCb(callable $callback)
613+ Sets the OAuthBearer token refresh callback */
614+ ZEND_METHOD (SimpleKafkaClient_Configuration , setOAuthBearerTokenRefreshCb )
615+ {
616+ zend_fcall_info fci ;
617+ zend_fcall_info_cache fcc ;
618+ kafka_conf_object * intern ;
619+
620+ ZEND_PARSE_PARAMETERS_START_EX (ZEND_PARSE_PARAMS_THROW , 1 , 1 )
621+ Z_PARAM_FUNC (fci , fcc )
622+ ZEND_PARSE_PARAMETERS_END ();
623+
624+ intern = get_kafka_conf_object (getThis ());
625+ if (!intern ) {
626+ return ;
627+ }
628+
629+ Z_ADDREF_P (& fci .function_name );
630+
631+ if (intern -> cbs .oauthbearer_refresh ) {
632+ zval_ptr_dtor (& intern -> cbs .oauthbearer_refresh -> fci .function_name );
633+ } else {
634+ intern -> cbs .oauthbearer_refresh = ecalloc (1 , sizeof (* intern -> cbs .oauthbearer_refresh ));
635+ }
636+
637+ intern -> cbs .oauthbearer_refresh -> fci = fci ;
638+ intern -> cbs .oauthbearer_refresh -> fcc = fcc ;
639+
640+ rd_kafka_conf_set_oauthbearer_token_refresh_cb (intern -> conf , kafka_conf_oauthbearer_token_refresh_cb );
641+ }
642+ /* }}} */
643+
582644void kafka_conf_init (INIT_FUNC_ARGS )
583645{
584646 zend_class_entry tmpce ;
0 commit comments