Skip to content

Commit 7458b6f

Browse files
remove tenant name from requests (#151)
1 parent 95459ae commit 7458b6f

File tree

4 files changed

+9
-60
lines changed

4 files changed

+9
-60
lines changed

memphis/consumer.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,7 @@ async def destroy(self):
178178
destroyConsumerReq = {
179179
"name": self.consumer_name,
180180
"station_name": self.station_name,
181-
"username": self.connection.username,
182-
"tenant_name": self.connection.tenant_name
181+
"username": self.connection.username
183182
}
184183
consumer_name = json.dumps(destroyConsumerReq, indent=2).encode("utf-8")
185184
res = await self.connection.broker_manager.request(

memphis/memphis.py

Lines changed: 5 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -92,50 +92,6 @@ async def sdk_client_updates_listener(self):
9292
except Exception as err:
9393
raise MemphisError(err)
9494

95-
async def get_tenant_name(self, account_id):
96-
try:
97-
getTenantNameReq = {
98-
"tenant_id": account_id
99-
}
100-
get_tenant_id_req_bytes = json.dumps(getTenantNameReq, indent=2).encode("utf-8")
101-
err_msg = await self.broker_manager.request(
102-
"$memphis_get_tenant_name", get_tenant_id_req_bytes, timeout=5
103-
)
104-
tenant_name_response = err_msg.data.decode("utf-8")
105-
tenant_name_response = json.loads(tenant_name_response)
106-
107-
if tenant_name_response['error'] != "":
108-
raise MemphisError(tenant_name_response['error'])
109-
110-
return tenant_name_response["tenant_name"]
111-
112-
except Exception as err:
113-
# for backward compatibility
114-
if err.__class__.__name__ == 'NoRespondersError':
115-
return self.MEMPHIS_GLOBAL_ACCOUNT_NAME
116-
else:
117-
raise MemphisError(err)
118-
119-
async def get_broker_manager_connection(self, connection_opts):
120-
if "user" in connection_opts:
121-
ping_connection_opts = connection_opts
122-
ping_connection_opts["allow_reconnect"] = False
123-
try:
124-
conn = await broker.connect(**ping_connection_opts)
125-
await conn.close()
126-
except Exception as ex:
127-
if "authorization violation" in str(ex).lower():
128-
try:
129-
ping_connection_opts["user"] = self.username
130-
conn = await broker.connect(**ping_connection_opts)
131-
await conn.close()
132-
connection_opts["user"] = self.username
133-
except Exception as ex1:
134-
raise ex1
135-
else:
136-
raise ex
137-
138-
return await broker.connect(**connection_opts)
13995

14096
async def connect(
14197
self,
@@ -216,7 +172,6 @@ async def connect(
216172
await self.sdk_client_updates_listener()
217173
self.broker_connection = self.broker_manager.jetstream()
218174
self.is_connection_active = True
219-
self.tenant_name = await self.get_tenant_name(self.account_id)
220175
except Exception as e:
221176
raise MemphisError(str(e))
222177

@@ -267,8 +222,7 @@ async def station(
267222
"Schemaverse": send_schema_failed_msg_to_dls,
268223
},
269224
"username": self.username,
270-
"tiered_storage_enabled": tiered_storage_enabled,
271-
"tenant_name": self.tenant_name
225+
"tiered_storage_enabled": tiered_storage_enabled
272226
}
273227
create_station_req_bytes = json.dumps(createStationReq, indent=2).encode(
274228
"utf-8"
@@ -299,7 +253,7 @@ async def attach_schema(self, name, stationName):
299253
try:
300254
if name == "" or stationName == "":
301255
raise MemphisError("name and station name can not be empty")
302-
msg = {"name": name, "station_name": stationName, "username": self.username, "tenant_name": self.tenant_name}
256+
msg = {"name": name, "station_name": stationName, "username": self.username}
303257
msgToSend = json.dumps(msg).encode("utf-8")
304258
err_msg = await self.broker_manager.request(
305259
"$memphis_schema_attachments", msgToSend, timeout=5
@@ -321,7 +275,7 @@ async def detach_schema(self, stationName):
321275
try:
322276
if stationName == "":
323277
raise MemphisError("station name is missing")
324-
msg = {"station_name": stationName, "username": self.username, "tenant_name": self.tenant_name}
278+
msg = {"station_name": stationName, "username": self.username}
325279
msgToSend = json.dumps(msg).encode("utf-8")
326280
err_msg = await self.broker_manager.request(
327281
"$memphis_schema_detachments", msgToSend, timeout=5
@@ -405,8 +359,7 @@ async def producer(
405359
"connection_id": self.connection_id,
406360
"producer_type": "application",
407361
"req_version": 1,
408-
"username": self.username,
409-
"tenant_name": self.tenant_name
362+
"username": self.username
410363
}
411364
create_producer_req_bytes = json.dumps(createProducerReq, indent=2).encode(
412365
"utf-8"
@@ -583,8 +536,7 @@ async def consumer(
583536
"start_consume_from_sequence": start_consume_from_sequence,
584537
"last_messages": last_messages,
585538
"req_version": 1,
586-
"username": self.username,
587-
"tenant_name": self.tenant_name
539+
"username": self.username
588540
}
589541

590542
create_consumer_req_bytes = json.dumps(createConsumerReq, indent=2).encode(

memphis/producer.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,7 @@ async def produce(
252252
"data": msgHex,
253253
"headers": headers,
254254
},
255-
"validation_error": str(e),
256-
"tenant_name": self.connection.tenant_name
255+
"validation_error": str(e)
257256
}
258257
buf = json.dumps(buf).encode("utf-8")
259258
await self.connection.broker_manager.publish("$memphis_schemaverse_dls", buf)
@@ -279,8 +278,7 @@ async def destroy(self):
279278
destroyProducerReq = {
280279
"name": self.producer_name,
281280
"station_name": self.station_name,
282-
"username": self.connection.username,
283-
"tenant_name": self.connection.tenant_name
281+
"username": self.connection.username
284282
}
285283

286284
producer_name = json.dumps(destroyProducerReq).encode("utf-8")

memphis/station.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def __init__(self, connection, name: str):
1212
async def destroy(self):
1313
"""Destroy the station."""
1414
try:
15-
nameReq = {"station_name": self.name, "username": self.connection.username, "tenant_name": self.connection.tenant_name}
15+
nameReq = {"station_name": self.name, "username": self.connection.username}
1616
station_name = json.dumps(nameReq, indent=2).encode("utf-8")
1717
res = await self.connection.broker_manager.request(
1818
"$memphis_station_destructions", station_name, timeout=5

0 commit comments

Comments
 (0)