4040#include "Zend/zend_exceptions.h"
4141#include "consumer_arginfo.h"
4242
43- typedef struct _object_intern {
44- rd_kafka_t * rk ;
45- kafka_conf_callbacks cbs ;
46- zend_object std ;
47- } object_intern ;
48-
4943static zend_class_entry * ce ;
5044static zend_object_handlers handlers ;
5145
5246static void kafka_consumer_free (zend_object * object ) /* {{{ */
5347{
54- object_intern * intern = php_kafka_from_obj (object_intern , object );
48+ kafka_object * intern = php_kafka_from_obj (kafka_object , object );
5549 rd_kafka_resp_err_t err ;
5650 kafka_conf_callbacks_dtor (& intern -> cbs );
5751
@@ -75,9 +69,9 @@ static void kafka_consumer_free(zend_object *object) /* {{{ */
7569static zend_object * kafka_consumer_new (zend_class_entry * class_type ) /* {{{ */
7670{
7771 zend_object * retval ;
78- object_intern * intern ;
72+ kafka_object * intern ;
7973
80- intern = ecalloc (1 , sizeof (object_intern )+ zend_object_properties_size (class_type ));
74+ intern = ecalloc (1 , sizeof (kafka_object )+ zend_object_properties_size (class_type ));
8175 zend_object_std_init (& intern -> std , class_type );
8276 object_properties_init (& intern -> std , class_type );
8377
@@ -88,18 +82,6 @@ static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */
8882}
8983/* }}} */
9084
91- static object_intern * get_object (zval * zconsumer ) /* {{{ */
92- {
93- object_intern * oconsumer = Z_KAFKA_P (object_intern , zconsumer );
94-
95- if (!oconsumer -> rk ) {
96- zend_throw_exception_ex (NULL , 0 , "SimpleKafkaClient\\Consumer::__construct() has not been called" );
97- return NULL ;
98- }
99-
100- return oconsumer ;
101- } /* }}} */
102-
10385static int has_group_id (rd_kafka_conf_t * conf ) { /* {{{ */
10486
10587 size_t len ;
@@ -125,15 +107,15 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, __construct)
125107 zval * zconf ;
126108 char errstr [512 ];
127109 rd_kafka_t * rk ;
128- object_intern * intern ;
110+ kafka_object * intern ;
129111 kafka_conf_object * conf_intern ;
130112 rd_kafka_conf_t * conf = NULL ;
131113
132114 ZEND_PARSE_PARAMETERS_START_EX (ZEND_PARSE_PARAMS_THROW , 1 , 1 )
133115 Z_PARAM_OBJECT_OF_CLASS (zconf , ce_kafka_conf )
134116 ZEND_PARSE_PARAMETERS_END ();
135117
136- intern = Z_KAFKA_P (object_intern , getThis ());
118+ intern = Z_KAFKA_P (kafka_object , getThis ());
137119
138120 conf_intern = get_kafka_conf_object (zconf );
139121 if (conf_intern ) {
@@ -175,7 +157,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign)
175157 HashTable * htopars = NULL ;
176158 rd_kafka_topic_partition_list_t * topics ;
177159 rd_kafka_resp_err_t err ;
178- object_intern * intern ;
160+ kafka_object * intern ;
179161
180162 if (zend_parse_parameters (ZEND_NUM_ARGS (), "|h!" , & htopars ) == FAILURE ) {
181163 return ;
@@ -186,7 +168,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign)
186168 Z_PARAM_ARRAY_HT (htopars )
187169 ZEND_PARSE_PARAMETERS_END ();
188170
189- intern = get_object (getThis ());
171+ intern = get_kafka_object (getThis ());
190172 if (!intern ) {
191173 return ;
192174 }
@@ -219,12 +201,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getAssignment)
219201{
220202 rd_kafka_resp_err_t err ;
221203 rd_kafka_topic_partition_list_t * topics ;
222- object_intern * intern ;
204+ kafka_object * intern ;
223205
224206 ZEND_PARSE_PARAMETERS_START_EX (ZEND_PARSE_PARAMS_THROW , 0 , 0 )
225207 ZEND_PARSE_PARAMETERS_END ();
226208
227- intern = get_object (getThis ());
209+ intern = get_kafka_object (getThis ());
228210 if (!intern ) {
229211 return ;
230212 }
@@ -247,7 +229,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe)
247229{
248230 HashTable * htopics ;
249231 HashPosition pos ;
250- object_intern * intern ;
232+ kafka_object * intern ;
251233 rd_kafka_topic_partition_list_t * topics ;
252234 rd_kafka_resp_err_t err ;
253235 zval * zv ;
@@ -256,7 +238,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe)
256238 Z_PARAM_ARRAY_HT (htopics )
257239 ZEND_PARSE_PARAMETERS_END ();
258240
259- intern = get_object (getThis ());
241+ intern = get_kafka_object (getThis ());
260242 if (!intern ) {
261243 return ;
262244 }
@@ -287,13 +269,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription)
287269{
288270 rd_kafka_resp_err_t err ;
289271 rd_kafka_topic_partition_list_t * topics ;
290- object_intern * intern ;
272+ kafka_object * intern ;
291273 int i ;
292274
293275 ZEND_PARSE_PARAMETERS_START_EX (ZEND_PARSE_PARAMS_THROW , 0 , 0 )
294276 ZEND_PARSE_PARAMETERS_END ();
295277
296- intern = get_object (getThis ());
278+ intern = get_kafka_object (getThis ());
297279 if (!intern ) {
298280 return ;
299281 }
@@ -319,13 +301,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription)
319301 Unsubscribe from the current subscription set */
320302ZEND_METHOD (SimpleKafkaClient_Consumer , unsubscribe )
321303{
322- object_intern * intern ;
304+ kafka_object * intern ;
323305 rd_kafka_resp_err_t err ;
324306
325307 ZEND_PARSE_PARAMETERS_START_EX (ZEND_PARSE_PARAMS_THROW , 0 , 0 )
326308 ZEND_PARSE_PARAMETERS_END ();
327309
328- intern = get_object (getThis ());
310+ intern = get_kafka_object (getThis ());
329311 if (!intern ) {
330312 return ;
331313 }
@@ -343,15 +325,15 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe)
343325 Consume message or get error event, triggers callbacks */
344326ZEND_METHOD (SimpleKafkaClient_Consumer , consume )
345327{
346- object_intern * intern ;
328+ kafka_object * intern ;
347329 zend_long timeout_ms ;
348330 rd_kafka_message_t * rkmessage , rkmessage_tmp = {0 };
349331
350332 ZEND_PARSE_PARAMETERS_START_EX (ZEND_PARSE_PARAMS_THROW , 1 , 1 )
351333 Z_PARAM_LONG (timeout_ms )
352334 ZEND_PARSE_PARAMETERS_END ();
353335
354- intern = get_object (getThis ());
336+ intern = get_kafka_object (getThis ());
355337 if (!intern ) {
356338 return ;
357339 }
@@ -374,7 +356,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume)
374356static void consumer_commit (int async , INTERNAL_FUNCTION_PARAMETERS ) /* {{{ */
375357{
376358 zval * zarg = NULL ;
377- object_intern * intern ;
359+ kafka_object * intern ;
378360 rd_kafka_topic_partition_list_t * offsets = NULL ;
379361 rd_kafka_resp_err_t err ;
380362
@@ -383,7 +365,7 @@ static void consumer_commit(int async, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
383365 Z_PARAM_ZVAL (zarg )
384366 ZEND_PARSE_PARAMETERS_END ();
385367
386- intern = get_object (getThis ());
368+ intern = get_kafka_object (getThis ());
387369 if (!intern ) {
388370 return ;
389371 }
@@ -476,12 +458,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, commitAsync)
476458 Close connection */
477459ZEND_METHOD (SimpleKafkaClient_Consumer , close )
478460{
479- object_intern * intern ;
461+ kafka_object * intern ;
480462
481463 ZEND_PARSE_PARAMETERS_START_EX (ZEND_PARSE_PARAMS_THROW , 0 , 0 )
482464 ZEND_PARSE_PARAMETERS_END ();
483465
484- intern = get_object (getThis ());
466+ intern = get_kafka_object (getThis ());
485467 if (!intern ) {
486468 return ;
487469 }
@@ -499,7 +481,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata)
499481 zval * only_zrkt = NULL ;
500482 zend_long timeout_ms ;
501483 rd_kafka_resp_err_t err ;
502- object_intern * intern ;
484+ kafka_object * intern ;
503485 const rd_kafka_metadata_t * metadata ;
504486 kafka_topic_object * only_orkt = NULL ;
505487
@@ -510,7 +492,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata)
510492 Z_PARAM_OBJECT_OF_CLASS (only_zrkt , ce_kafka_topic )
511493 ZEND_PARSE_PARAMETERS_END ();
512494
513- intern = get_object (getThis ());
495+ intern = get_kafka_object (getThis ());
514496 if (!intern ) {
515497 return ;
516498 }
@@ -540,14 +522,14 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle)
540522 char * topic ;
541523 size_t topic_len ;
542524 rd_kafka_topic_t * rkt ;
543- object_intern * intern ;
525+ kafka_object * intern ;
544526 kafka_topic_object * topic_intern ;
545527
546528 ZEND_PARSE_PARAMETERS_START_EX (ZEND_PARSE_PARAMS_THROW , 1 , 1 )
547529 Z_PARAM_STRING (topic , topic_len )
548530 ZEND_PARSE_PARAMETERS_END ();
549531
550- intern = get_object (getThis ());
532+ intern = get_kafka_object (getThis ());
551533 if (!intern ) {
552534 return ;
553535 }
@@ -577,7 +559,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
577559{
578560 HashTable * htopars = NULL ;
579561 zend_long timeout_ms ;
580- object_intern * intern ;
562+ kafka_object * intern ;
581563 rd_kafka_resp_err_t err ;
582564 rd_kafka_topic_partition_list_t * topics ;
583565
@@ -586,7 +568,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
586568 Z_PARAM_LONG (timeout_ms )
587569 ZEND_PARSE_PARAMETERS_END ();
588570
589- intern = get_object (getThis ());
571+ intern = get_kafka_object (getThis ());
590572 if (!intern ) {
591573 return ;
592574 }
@@ -615,15 +597,15 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
615597ZEND_METHOD (SimpleKafkaClient_Consumer , getOffsetPositions )
616598{
617599 HashTable * htopars = NULL ;
618- object_intern * intern ;
600+ kafka_object * intern ;
619601 rd_kafka_resp_err_t err ;
620602 rd_kafka_topic_partition_list_t * topics ;
621603
622604 ZEND_PARSE_PARAMETERS_START_EX (ZEND_PARSE_PARAMS_THROW , 1 , 1 )
623605 Z_PARAM_ARRAY_HT (htopars )
624606 ZEND_PARSE_PARAMETERS_END ();
625607
626- intern = get_object (getThis ());
608+ intern = get_kafka_object (getThis ());
627609 if (!intern ) {
628610 return ;
629611 }
@@ -650,7 +632,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions)
650632ZEND_METHOD (SimpleKafkaClient_Consumer , offsetsForTimes )
651633{
652634 HashTable * htopars = NULL ;
653- object_intern * intern ;
635+ kafka_object * intern ;
654636 rd_kafka_topic_partition_list_t * topicPartitions ;
655637 zend_long timeout_ms ;
656638 rd_kafka_resp_err_t err ;
@@ -660,7 +642,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
660642 Z_PARAM_LONG (timeout_ms )
661643 ZEND_PARSE_PARAMETERS_END ();
662644
663- intern = get_object (getThis ());
645+ intern = get_kafka_object (getThis ());
664646 if (!intern ) {
665647 return ;
666648 }
@@ -686,7 +668,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
686668 Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */
687669ZEND_METHOD (SimpleKafkaClient_Consumer , queryWatermarkOffsets )
688670{
689- object_intern * intern ;
671+ kafka_object * intern ;
690672 char * topic ;
691673 size_t topic_length ;
692674 long low , high ;
@@ -705,7 +687,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets)
705687 ZVAL_DEREF (lowResult );
706688 ZVAL_DEREF (highResult );
707689
708- intern = get_object (getThis ());
690+ intern = get_kafka_object (getThis ());
709691 if (!intern ) {
710692 return ;
711693 }
@@ -732,5 +714,5 @@ void kafka_consumer_init(INIT_FUNC_ARGS) /* {{{ */
732714
733715 handlers = kafka_default_object_handlers ;
734716 handlers .free_obj = kafka_consumer_free ;
735- handlers .offset = XtOffsetOf (object_intern , std );
717+ handlers .offset = XtOffsetOf (kafka_object , std );
736718}
0 commit comments