@@ -81,17 +81,8 @@ Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object,
8181 return NULL ;
8282 }
8383 } else {
84- v8::Local<v8::Function> cb = value.As <v8::Function>();
85- rdconf->ConfigureCallback (string_key, cb, true , errstr);
86- if (!errstr.empty ()) {
87- delete rdconf;
88- return NULL ;
89- }
90- rdconf->ConfigureCallback (string_key, cb, false , errstr);
91- if (!errstr.empty ()) {
92- delete rdconf;
93- return NULL ;
94- }
84+ // Do nothing - Connection::NodeConfigureCallbacks will handle this for each
85+ // of the three client types.
9586 }
9687 }
9788
@@ -100,56 +91,76 @@ Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object,
10091
10192void Conf::ConfigureCallback (const std::string &string_key, const v8::Local<v8::Function> &cb, bool add, std::string &errstr) {
10293 if (string_key.compare (" rebalance_cb" ) == 0 ) {
94+ NodeKafka::Callbacks::Rebalance *rebalance = rebalance_cb ();
10395 if (add) {
104- if (this ->m_rebalance_cb == NULL ) {
105- this ->m_rebalance_cb = new NodeKafka::Callbacks::Rebalance ();
96+ if (rebalance == NULL ) {
97+ rebalance = new NodeKafka::Callbacks::Rebalance ();
98+ this ->set (string_key, rebalance, errstr);
10699 }
107- this -> m_rebalance_cb ->dispatcher .AddCallback (cb);
108- this ->set (string_key, this -> m_rebalance_cb , errstr);
100+ rebalance ->dispatcher .AddCallback (cb);
101+ this ->set (string_key, rebalance , errstr);
109102 } else {
110- if (this ->m_rebalance_cb != NULL ) {
111- this ->m_rebalance_cb ->dispatcher .RemoveCallback (cb);
103+ if (rebalance == NULL ) {
104+ rebalance->dispatcher .RemoveCallback (cb);
105+ this ->set (string_key, rebalance, errstr);
112106 }
113107 }
114108 } else if (string_key.compare (" offset_commit_cb" ) == 0 ) {
109+ NodeKafka::Callbacks::OffsetCommit *offset_commit = offset_commit_cb ();
115110 if (add) {
116- if (this ->m_offset_commit_cb == NULL ) {
117- this ->m_offset_commit_cb = new NodeKafka::Callbacks::OffsetCommit ();
111+ if (offset_commit == NULL ) {
112+ offset_commit = new NodeKafka::Callbacks::OffsetCommit ();
113+ this ->set (string_key, offset_commit, errstr);
118114 }
119- this ->m_offset_commit_cb ->dispatcher .AddCallback (cb);
120- this ->set (string_key, this ->m_offset_commit_cb , errstr);
115+ offset_commit->dispatcher .AddCallback (cb);
121116 } else {
122- if (this -> m_offset_commit_cb != NULL ) {
123- this -> m_offset_commit_cb ->dispatcher .RemoveCallback (cb);
117+ if (offset_commit != NULL ) {
118+ offset_commit ->dispatcher .RemoveCallback (cb);
124119 }
125120 }
126121 }
127122}
128123
129124void Conf::listen () {
130- if (m_rebalance_cb) {
131- m_rebalance_cb->dispatcher .Activate ();
125+ NodeKafka::Callbacks::Rebalance *rebalance = rebalance_cb ();
126+ if (rebalance) {
127+ rebalance->dispatcher .Activate ();
132128 }
133129
134- if (m_offset_commit_cb) {
135- m_offset_commit_cb->dispatcher .Activate ();
130+ NodeKafka::Callbacks::OffsetCommit *offset_commit = offset_commit_cb ();
131+ if (offset_commit) {
132+ offset_commit->dispatcher .Activate ();
136133 }
137134}
138135
139136void Conf::stop () {
140- if (m_rebalance_cb) {
141- m_rebalance_cb->dispatcher .Deactivate ();
137+ NodeKafka::Callbacks::Rebalance *rebalance = rebalance_cb ();
138+ if (rebalance) {
139+ rebalance->dispatcher .Deactivate ();
140+ }
141+
142+ NodeKafka::Callbacks::OffsetCommit *offset_commit = offset_commit_cb ();
143+ if (offset_commit) {
144+ offset_commit->dispatcher .Deactivate ();
142145 }
146+ }
147+
148+ Conf::~Conf () {}
143149
144- if (m_offset_commit_cb) {
145- m_offset_commit_cb->dispatcher .Deactivate ();
150+ NodeKafka::Callbacks::Rebalance* Conf::rebalance_cb () const {
151+ RdKafka::RebalanceCb *cb = NULL ;
152+ if (this ->get (cb) != RdKafka::Conf::CONF_OK) {
153+ return NULL ;
146154 }
155+ return static_cast <NodeKafka::Callbacks::Rebalance*>(cb);
147156}
148157
149- Conf::~Conf () {
150- if (m_rebalance_cb) {
151- delete m_rebalance_cb;
158+ NodeKafka::Callbacks::OffsetCommit* Conf::offset_commit_cb () const {
159+ RdKafka::OffsetCommitCb *cb = NULL ;
160+ if (this ->get (cb) != RdKafka::Conf::CONF_OK) {
161+ return NULL ;
152162 }
163+ return static_cast <NodeKafka::Callbacks::OffsetCommit*>(cb);
153164}
154165
155166} // namespace NodeKafka
0 commit comments