Skip to content

Commit 4385d41

Browse files
add account_name in connect method for ListenForSchemaverseDlsEvents (#145)
* add account_name in connect method for ListenForSchemaverseDlsEvents * add get_tenant_name method * support back compatibility in get_tenant_name * fix issue
1 parent 137eeb2 commit 4385d41

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

memphis/memphis.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from graphql import validate as validate_graphql
3333
from jsonschema import validate
3434
from memphis.consumer import Consumer
35-
from memphis.exceptions import MemphisConnectError, MemphisError, MemphisHeaderError
35+
from memphis.exceptions import MemphisConnectError, MemphisError, MemphisHeaderError, MemphisSchemaError
3636
from memphis.headers import Headers
3737
from memphis.producer import Producer
3838
from memphis.station import Station
@@ -42,6 +42,7 @@
4242

4343
class Memphis:
4444
MAX_BATCH_SIZE = 5000
45+
MEMPHIS_GLOBAL_ACCOUNT_NAME = "$memphis"
4546
def __init__(self):
4647
self.is_connection_active = False
4748
self.schema_updates_data = {}
@@ -90,6 +91,31 @@ async def sdk_client_updates_listener(self):
9091
self.configuration_tasks = task
9192
except Exception as err:
9293
raise MemphisError(err)
94+
95+
async def get_tenant_name(self, account_id):
96+
try:
97+
getTenantNameReq = {
98+
"tenant_id": account_id
99+
}
100+
get_tenant_id_req_bytes = json.dumps(getTenantNameReq, indent=2).encode("utf-8")
101+
err_msg = await self.broker_manager.request(
102+
"$memphis_get_tenant_name", get_tenant_id_req_bytes, timeout=5
103+
)
104+
tenant_name_response = err_msg.data.decode("utf-8")
105+
tenant_name_response = json.loads(tenant_name_response)
106+
107+
if tenant_name_response['error'] != "":
108+
raise MemphisError(tenant_name_response['error'])
109+
110+
return tenant_name_response["tenant_name"]
111+
112+
except Exception as err:
113+
# for backward compatibility
114+
if err.__class__.__name__ == 'NoRespondersError':
115+
return self.MEMPHIS_GLOBAL_ACCOUNT_NAME
116+
else:
117+
raise MemphisError(err)
118+
93119

94120
async def connect(
95121
self,
@@ -170,6 +196,7 @@ async def connect(
170196
await self.sdk_client_updates_listener()
171197
self.broker_connection = self.broker_manager.jetstream()
172198
self.is_connection_active = True
199+
self.tenant_name = await self.get_tenant_name(self.account_id)
173200
except Exception as e:
174201
raise MemphisError(str(e))
175202

memphis/producer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ async def produce(
226226
"$memphis_connectionId": self.connection.connection_id,
227227
}
228228

229-
if headers != {}:
229+
if headers != {} and not headers == None:
230230
headers = headers.headers
231231
headers.update(memphis_headers)
232232
else:
@@ -246,6 +246,7 @@ async def produce(
246246
"headers": headers,
247247
},
248248
"validation_error": str(e),
249+
"tenant_name": self.connection.tenant_name
249250
}
250251
buf = json.dumps(buf).encode("utf-8")
251252
await self.connection.broker_manager.publish("$memphis_schemaverse_dls", buf)

0 commit comments

Comments
 (0)