Skip to content

Commit fabe9cc

Browse files
Produce without producer (#112)
* added produce without creating producer * fix destroy functions + add is_connected func * internal station name + producer name to lower * fix is connected * rearrange args places in call for produce * fix produce args + add is_connected to readme * add real name for producer * real_name to lower fix + remove duplicate lines --------- Co-authored-by: shay23b <shay@memphis.dev>
1 parent 89a20a3 commit fabe9cc

File tree

1 file changed

+6
-8
lines changed

1 file changed

+6
-8
lines changed

memphis/memphis.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -325,14 +325,13 @@ async def producer(self, station_name: str, producer_name: str, generate_random_
325325
generate_random_suffix (bool): false by default, if true concatenate a random suffix to producer's name
326326
Raises:
327327
Exception: _description_
328-
Exception: _description_
329328
Returns:
330329
_type_: _description_
331330
"""
332331
try:
333332
if not self.is_connection_active:
334333
raise MemphisError("Connection is dead")
335-
334+
real_name = producer_name.lower()
336335
if generate_random_suffix:
337336
producer_name = self.__generateRandomSuffix(producer_name)
338337
createProducerReq = {
@@ -366,8 +365,8 @@ async def producer(self, station_name: str, producer_name: str, generate_random_
366365
elif self.schema_updates_data[station_name_internal]['type'] == "graphql":
367366
self.graphql_schemas[station_name_internal] = build_graphql_schema(
368367
self.schema_updates_data[station_name_internal]['active_version']['schema_content'])
369-
producer = Producer(self, producer_name, station_name)
370-
map_key = station_name_internal+"_"+producer_name.lower()
368+
producer = Producer(self, producer_name, station_name, real_name)
369+
map_key = station_name_internal+"_"+real_name
371370
self.producers_map[map_key] = producer
372371
return producer
373372

@@ -510,7 +509,6 @@ async def produce(self, station_name: str, producer_name: str, message, generate
510509
producer = self.producers_map[map_key]
511510
else:
512511
producer = await self.producer(station_name=station_name, producer_name=producer_name, generate_random_suffix=generate_random_suffix)
513-
self.producers_map[map_key] = producer
514512
await producer.produce(message=message, ack_wait_sec=ack_wait_sec, headers=headers, async_produce= async_produce, msg_id=msg_id)
515513
except Exception as e:
516514
raise MemphisError(str(e)) from e
@@ -567,12 +565,13 @@ def get_internal_name(name: str) -> str:
567565

568566

569567
class Producer:
570-
def __init__(self, connection, producer_name: str, station_name: str):
568+
def __init__(self, connection, producer_name: str, station_name: str, real_name: str):
571569
self.connection = connection
572570
self.producer_name = producer_name.lower()
573571
self.station_name = station_name
574572
self.internal_station_name = get_internal_name(self.station_name)
575573
self.loop = asyncio.get_running_loop()
574+
self.real_name = real_name
576575

577576
async def validate_msg(self, message):
578577
if self.connection.schema_updates_data[self.internal_station_name] != {}:
@@ -677,7 +676,6 @@ async def produce(self, message, ack_wait_sec: int = 15, headers: Union[Headers,
677676
msg_id (string, optional): Attach msg-id header to the message in order to achieve idempotency
678677
Raises:
679678
Exception: _description_
680-
Exception: _description_
681679
"""
682680
try:
683681
message = await self.validate_msg(message)
@@ -787,7 +785,7 @@ async def destroy(self):
787785
if sub is not None:
788786
await sub.unsubscribe()
789787

790-
map_key = station_name_internal+"_"+self.producer_name.lower()
788+
map_key = station_name_internal+"_"+self.real_name
791789
del self.connection.producers_map[map_key]
792790

793791
except Exception as e:

0 commit comments

Comments
 (0)