File tree Expand file tree Collapse file tree 1 file changed +21
-1
lines changed Expand file tree Collapse file tree 1 file changed +21
-1
lines changed Original file line number Diff line number Diff line change @@ -116,6 +116,26 @@ async def get_tenant_name(self, account_id):
116116 else :
117117 raise MemphisError (err )
118118
119+ async def get_broker_manager_connection (self , connection_opts ):
120+ if "user" in connection_opts :
121+ ping_connection_opts = connection_opts
122+ ping_connection_opts ["allow_reconnect" ] = False
123+ try :
124+ conn = await broker .connect (** ping_connection_opts )
125+ await conn .close ()
126+ except Exception as ex :
127+ if "authorization violation" in str (ex ).lower ():
128+ try :
129+ ping_connection_opts ["user" ] = self .username
130+ conn = await broker .connect (** ping_connection_opts )
131+ await conn .close ()
132+ connection_opts ["user" ] = self .username
133+ except Exception as ex1 :
134+ raise ex1
135+ else :
136+ raise ex
137+
138+ return await broker .connect (** connection_opts )
119139
120140 async def connect (
121141 self ,
@@ -192,7 +212,7 @@ async def connect(
192212 connection_opts ["user" ]= self .username + "$" + str (self .account_id )
193213 connection_opts ["password" ]= self .password
194214
195- self .broker_manager = await broker . connect ( ** connection_opts )
215+ self .broker_manager = await self . get_broker_manager_connection ( connection_opts )
196216 await self .sdk_client_updates_listener ()
197217 self .broker_connection = self .broker_manager .jetstream ()
198218 self .is_connection_active = True
You can’t perform that action at this time.
0 commit comments