Skip to content

Commit 33ae249

Browse files
Add tenant name to requests (#148)
* add tenant name to requests * update the timeout to 2 ms
1 parent 3e48d97 commit 33ae249

File tree

4 files changed

+11
-5
lines changed

4 files changed

+11
-5
lines changed

memphis/consumer.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(
4545
self.dls_current_index = 0
4646
self.dls_callback_func = None
4747
self.t_dls = asyncio.create_task(self.__consume_dls())
48+
self.t_consume = None
4849

4950
def set_context(self, context):
5051
"""Set a context (dict) that will be passed to each message handler call."""
@@ -178,6 +179,7 @@ async def destroy(self):
178179
"name": self.consumer_name,
179180
"station_name": self.station_name,
180181
"username": self.connection.username,
182+
"tenant_name": self.connection.tenant_name
181183
}
182184
consumer_name = json.dumps(destroyConsumerReq, indent=2).encode("utf-8")
183185
res = await self.connection.broker_manager.request(

memphis/memphis.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ async def connect(
128128
reconnect: bool = True,
129129
max_reconnect: int = 10,
130130
reconnect_interval_ms: int = 1500,
131-
timeout_ms: int = 15000,
131+
timeout_ms: int = 2000,
132132
cert_file: str = "",
133133
key_file: str = "",
134134
ca_file: str = "",
@@ -248,6 +248,7 @@ async def station(
248248
},
249249
"username": self.username,
250250
"tiered_storage_enabled": tiered_storage_enabled,
251+
"tenant_name": self.tenant_name
251252
}
252253
create_station_req_bytes = json.dumps(createStationReq, indent=2).encode(
253254
"utf-8"
@@ -278,7 +279,7 @@ async def attach_schema(self, name, stationName):
278279
try:
279280
if name == "" or stationName == "":
280281
raise MemphisError("name and station name can not be empty")
281-
msg = {"name": name, "station_name": stationName, "username": self.username}
282+
msg = {"name": name, "station_name": stationName, "username": self.username, "tenant_name": self.tenant_name}
282283
msgToSend = json.dumps(msg).encode("utf-8")
283284
err_msg = await self.broker_manager.request(
284285
"$memphis_schema_attachments", msgToSend, timeout=5
@@ -300,7 +301,7 @@ async def detach_schema(self, stationName):
300301
try:
301302
if stationName == "":
302303
raise MemphisError("station name is missing")
303-
msg = {"station_name": stationName, "username": self.username}
304+
msg = {"station_name": stationName, "username": self.username, "tenant_name": self.tenant_name}
304305
msgToSend = json.dumps(msg).encode("utf-8")
305306
err_msg = await self.broker_manager.request(
306307
"$memphis_schema_detachments", msgToSend, timeout=5
@@ -385,6 +386,7 @@ async def producer(
385386
"producer_type": "application",
386387
"req_version": 1,
387388
"username": self.username,
389+
"tenant_name": self.tenant_name
388390
}
389391
create_producer_req_bytes = json.dumps(createProducerReq, indent=2).encode(
390392
"utf-8"
@@ -417,7 +419,7 @@ async def producer(
417419
if self.schema_updates_data[internal_station_name]["type"] == "json":
418420
schema = self.schema_updates_data[internal_station_name][
419421
"active_version"
420-
]["schema_content"]
422+
]["schema_content"]
421423
self.json_schemas[internal_station_name] = json.loads(schema)
422424
elif (
423425
self.schema_updates_data[internal_station_name]["type"] == "graphql"
@@ -562,6 +564,7 @@ async def consumer(
562564
"last_messages": last_messages,
563565
"req_version": 1,
564566
"username": self.username,
567+
"tenant_name": self.tenant_name
565568
}
566569

567570
create_consumer_req_bytes = json.dumps(createConsumerReq, indent=2).encode(

memphis/producer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ async def destroy(self):
273273
"name": self.producer_name,
274274
"station_name": self.station_name,
275275
"username": self.connection.username,
276+
"tenant_name": self.connection.tenant_name
276277
}
277278

278279
producer_name = json.dumps(destroyProducerReq).encode("utf-8")

memphis/station.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def __init__(self, connection, name: str):
1212
async def destroy(self):
1313
"""Destroy the station."""
1414
try:
15-
nameReq = {"station_name": self.name, "username": self.connection.username}
15+
nameReq = {"station_name": self.name, "username": self.connection.username, "tenant_name": self.connection.tenant_name}
1616
station_name = json.dumps(nameReq, indent=2).encode("utf-8")
1717
res = await self.connection.broker_manager.request(
1818
"$memphis_station_destructions", station_name, timeout=5

0 commit comments

Comments
 (0)