Skip to content

Commit ac8ae41

Browse files
authored
added produce without creating producer (#110)
* added produce without creating producer * fix destroy functions + add is_connected func * internal station name + producer name to lower * fix is connected * rearrange args places in call for produce * fix produce args + add is_connected to readme
1 parent 599cba2 commit ac8ae41

File tree

2 files changed

+77
-17
lines changed

2 files changed

+77
-17
lines changed

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,22 @@ producer = await memphis.producer(station_name="<station-name>", producer_name="
197197
```
198198

199199
### Producing a message
200+
Without creating a producer.
201+
In cases where extra performance is needed the recommended way is to create a producer first
202+
and produce messages by using the produce function of it
203+
```python
204+
await memphis.produce(station_name='test_station_py', producer_name='prod_py',
205+
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
206+
generate_random_suffix=False, #defaults to false
207+
ack_wait_sec=15, # defaults to 15
208+
headers=headers, # default to {}
209+
async_produce=False, #defaults to false
210+
msg_id="123"
211+
)
212+
```
213+
200214

215+
Creating a producer first
201216
```python
202217
await prod.produce(
203218
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
@@ -306,3 +321,10 @@ sequence_number = msg.get_sequence_number()
306321
```python
307322
consumer.destroy()
308323
```
324+
325+
326+
### Check if broker is connected
327+
328+
```python
329+
memphis.is_connected()
330+
```

memphis/memphis.py

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,24 @@ def func_wrapper():
4848
def cancel(self):
4949
self.t.cancel()
5050

51+
class Headers:
52+
def __init__(self):
53+
self.headers = {}
54+
55+
def add(self, key, value):
56+
"""Add a header.
57+
Args:
58+
key (string): header key.
59+
value (string): header value.
60+
Raises:
61+
Exception: _description_
62+
"""
63+
if not key.startswith("$memphis"):
64+
self.headers[key] = value
65+
else:
66+
raise MemphisHeaderError(
67+
"Keys in headers should not start with $memphis")
68+
5169

5270
class Memphis:
5371
def __init__(self):
@@ -63,6 +81,7 @@ def __init__(self):
6381
self.station_schemaverse_to_dls = {}
6482
self.update_configurations_sub = {}
6583
self.configuration_tasks = {}
84+
self.producers_map = dict()
6685

6786
async def get_msgs_update_configurations(self, iterable: Iterable):
6887
try:
@@ -259,7 +278,6 @@ async def close(self):
259278
try:
260279
if self.is_connection_active:
261280
await self.broker_manager.close()
262-
self.broker_manager = None
263281
self.connection_id = None
264282
self.is_connection_active = False
265283
keys_schema_updates_subs = self.schema_updates_subs.keys()
@@ -281,6 +299,7 @@ async def close(self):
281299
await sub.unsubscribe()
282300
if self.update_configurations_sub is not None:
283301
await self.update_configurations_sub.unsubscribe()
302+
self.producers_map.clear()
284303
except:
285304
return
286305

@@ -347,7 +366,10 @@ async def producer(self, station_name: str, producer_name: str, generate_random_
347366
elif self.schema_updates_data[station_name_internal]['type'] == "graphql":
348367
self.graphql_schemas[station_name_internal] = build_graphql_schema(
349368
self.schema_updates_data[station_name_internal]['active_version']['schema_content'])
350-
return Producer(self, producer_name, station_name)
369+
producer = Producer(self, producer_name, station_name)
370+
map_key = station_name_internal+"_"+producer_name.lower()
371+
self.producers_map[map_key] = producer
372+
return producer
351373

352374
except Exception as e:
353375
raise MemphisError(str(e)) from e
@@ -463,28 +485,39 @@ async def consumer(self, station_name: str, consumer_name: str, consumer_group:
463485
raise MemphisError(err_msg)
464486

465487
return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries, start_consume_from_sequence=start_consume_from_sequence, last_messages=last_messages)
466-
467488
except Exception as e:
468489
raise MemphisError(str(e)) from e
469490

470-
471-
class Headers:
472-
def __init__(self):
473-
self.headers = {}
474-
475-
def add(self, key, value):
476-
"""Add a header.
491+
async def produce(self, station_name: str, producer_name: str, message, generate_random_suffix: bool =False, ack_wait_sec: int = 15, headers: Union[Headers, None] = None, async_produce: bool=False, msg_id: Union[str, None]= None):
492+
"""Produces a message into a station without the need to create a producer.
477493
Args:
478-
key (string): header key.
479-
value (string): header value.
494+
station_name (str): station name to produce messages into.
495+
producer_name (str): name for the producer.
496+
message (bytearray/dict): message to send into the station - bytearray/protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
497+
generate_random_suffix (bool): false by default, if true concatenate a random suffix to producer's name
498+
ack_wait_sec (int, optional): max time in seconds to wait for an ack from memphis. Defaults to 15.
499+
headers (dict, optional): Message headers, defaults to {}.
500+
async_produce (boolean, optional): produce operation won't wait for broker acknowledgement
501+
msg_id (string, optional): Attach msg-id header to the message in order to achieve idempotency
480502
Raises:
481503
Exception: _description_
504+
Exception: _description_
482505
"""
483-
if not key.startswith("$memphis"):
484-
self.headers[key] = value
485-
else:
486-
raise MemphisHeaderError(
487-
"Keys in headers should not start with $memphis")
506+
try:
507+
station_name_internal = get_internal_name(station_name)
508+
map_key = station_name_internal+"_"+producer_name.lower()
509+
producer = None
510+
if map_key in self.producers_map:
511+
producer = self.producers_map[map_key]
512+
else:
513+
producer = await self.producer(station_name=station_name, producer_name=producer_name, generate_random_suffix=generate_random_suffix)
514+
self.producers_map[map_key] = producer
515+
await producer.produce(message=message, ack_wait_sec=ack_wait_sec, headers=headers, async_produce= async_produce, msg_id=msg_id)
516+
except Exception as e:
517+
raise MemphisError(str(e)) from e
518+
519+
def is_connected(self):
520+
return self.broker_manager.is_connected
488521

489522

490523
class Station:
@@ -523,6 +556,8 @@ async def destroy(self):
523556
if sub is not None:
524557
await sub.unsubscribe()
525558

559+
self.connection.producers_map = {k: v for k, v in self.connection.producers_map.items() if self.name not in k}
560+
526561
except Exception as e:
527562
raise MemphisError(str(e)) from e
528563

@@ -753,6 +788,9 @@ async def destroy(self):
753788
if sub is not None:
754789
await sub.unsubscribe()
755790

791+
map_key = station_name_internal+"_"+self.producer_name.lower()
792+
del self.connection.producers_map[map_key]
793+
756794
except Exception as e:
757795
raise Exception(e)
758796

0 commit comments

Comments
 (0)