1111#include < vector>
1212
1313#include " src/kafka-consumer.h"
14+ #include " src/per-isolate-data.h"
1415#include " src/workers.h"
1516
1617using Nan::FunctionCallbackInfo;
@@ -36,6 +37,10 @@ KafkaConsumer::KafkaConsumer(Conf* gconfig, Conf* tconfig):
3637 m_consume_loop = nullptr ;
3738 }
3839
40+ void KafkaConsumer::delete_instance (void * arg) {
41+ delete (static_cast <KafkaConsumer*>(arg));
42+ }
43+
3944KafkaConsumer::~KafkaConsumer () {
4045 // We only want to run this if it hasn't been run already
4146 Disconnect ();
@@ -558,8 +563,6 @@ std::string KafkaConsumer::RebalanceProtocol() {
558563 return consumer->rebalance_protocol ();
559564}
560565
561- Nan::Persistent<v8::Function> KafkaConsumer::constructor;
562-
563566void KafkaConsumer::Init (v8::Local<v8::Object> exports) {
564567 Nan::HandleScope scope;
565568
@@ -620,7 +623,8 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
620623 Nan::SetPrototypeMethod (tpl, " commitSync" , NodeCommitSync);
621624 Nan::SetPrototypeMethod (tpl, " offsetsStore" , NodeOffsetsStore);
622625
623- constructor.Reset ((tpl->GetFunction (Nan::GetCurrentContext ()))
626+ PerIsolateData::For (v8::Isolate::GetCurrent ())->KafkaConsumerConstructor ()
627+ .Reset ((tpl->GetFunction (Nan::GetCurrentContext ()))
624628 .ToLocalChecked ());
625629 Nan::Set (exports, Nan::New (" KafkaConsumer" ).ToLocalChecked (),
626630 (tpl->GetFunction (Nan::GetCurrentContext ())).ToLocalChecked ());
@@ -680,7 +684,8 @@ v8::Local<v8::Object> KafkaConsumer::NewInstance(v8::Local<v8::Value> arg) {
680684 const unsigned argc = 1 ;
681685
682686 v8::Local<v8::Value> argv[argc] = { arg };
683- v8::Local<v8::Function> cons = Nan::New<v8::Function>(constructor);
687+ v8::Local<v8::Function> cons = Nan::New<v8::Function>(
688+ PerIsolateData::For (v8::Isolate::GetCurrent ())->KafkaConsumerConstructor ());
684689 v8::Local<v8::Object> instance =
685690 Nan::NewInstance (cons, argc, argv).ToLocalChecked ();
686691
@@ -1395,7 +1400,7 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) {
13951400 // cleanup the async worker
13961401 consumeLoop->WorkComplete ();
13971402 consumeLoop->Destroy ();
1398-
1403+
13991404 consumer->m_consume_loop = nullptr ;
14001405 }
14011406
0 commit comments