Skip to content

Commit a448492

Browse files
idonaaman123ido
andauthored
limit batchsize (#138)
* limit batchsize * change error message --------- Co-authored-by: ido <ido@ip-192-168-1-78.eu-central-1.compute.internal>
1 parent 5b5536f commit a448492

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

memphis/consumer.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010

1111
class Consumer:
12+
MAX_BATCH_SIZE = 5000
1213
def __init__(
1314
self,
1415
connection,
@@ -108,14 +109,17 @@ async def __consume_dls(self):
108109
self.context,
109110
)
110111
except Exception as e:
111-
await self.dls_callback_func([], MemphisError(str(e)), self.context)
112-
return
112+
if self.dls_callback_func != None:
113+
await self.dls_callback_func([], MemphisError(str(e)), self.context)
114+
return
113115

114116
async def fetch(self, batch_size: int = 10):
115117
"""Fetch a batch of messages."""
116118
messages = []
117119
if self.connection.is_connection_active:
118120
try:
121+
if batch_size > self.MAX_BATCH_SIZE:
122+
raise MemphisError(f"Batch size can not be greater than {self.MAX_BATCH_SIZE}")
119123
self.batch_size = batch_size
120124
if len(self.dls_messages) > 0:
121125
if len(self.dls_messages) <= batch_size:

memphis/memphis.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141

4242
class Memphis:
43+
MAX_BATCH_SIZE = 5000
4344
def __init__(self):
4445
self.is_connection_active = False
4546
self.schema_updates_data = {}
@@ -499,6 +500,8 @@ async def consumer(
499500
try:
500501
if not self.is_connection_active:
501502
raise MemphisError("Connection is dead")
503+
if batch_size > self.MAX_BATCH_SIZE:
504+
raise MemphisError(f"Batch size can not be greater than {self.MAX_BATCH_SIZE}")
502505
real_name = consumer_name.lower()
503506
if generate_random_suffix:
504507
consumer_name = self.__generateRandomSuffix(consumer_name)
@@ -639,6 +642,8 @@ async def fetch_messages(
639642
consumer = None
640643
if not self.is_connection_active:
641644
raise MemphisError("Cant fetch messages without being connected!")
645+
if batch_size > self.MAX_BATCH_SIZE:
646+
raise MemphisError(f"Batch size can not be greater than {self.MAX_BATCH_SIZE}")
642647
internal_station_name = get_internal_name(station_name)
643648
consumer_map_key = internal_station_name + "_" + consumer_name.lower()
644649
if consumer_map_key in self.consumers_map:

0 commit comments

Comments
 (0)