@@ -631,7 +631,6 @@ async def produce(
631631 except Exception as e :
632632 raise MemphisError (str (e )) from e
633633
634-
635634 async def fetch_messages (
636635 self ,
637636 station_name : str ,
@@ -643,8 +642,8 @@ async def fetch_messages(
643642 max_msg_deliveries : int = 10 ,
644643 generate_random_suffix : bool = False ,
645644 start_consume_from_sequence : int = 1 ,
646- last_messages : int = - 1
647- ):
645+ last_messages : int = - 1 ,
646+ ):
648647 """Consume a batch of messages.
649648 Args:.
650649 station_name (str): station name to consume messages from.
@@ -669,7 +668,18 @@ async def fetch_messages(
669668 if consumer_map_key in self .consumers_map :
670669 consumer = self .consumers_map [consumer_map_key ]
671670 else :
672- consumer = await self .consumer (station_name = station_name , consumer_name = consumer_name , consumer_group = consumer_group , batch_size = batch_size , batch_max_time_to_wait_ms = batch_max_time_to_wait_ms , max_ack_time_ms = max_ack_time_ms , max_msg_deliveries = max_msg_deliveries , generate_random_suffix = generate_random_suffix , start_consume_from_sequence = start_consume_from_sequence , last_messages = last_messages )
671+ consumer = await self .consumer (
672+ station_name = station_name ,
673+ consumer_name = consumer_name ,
674+ consumer_group = consumer_group ,
675+ batch_size = batch_size ,
676+ batch_max_time_to_wait_ms = batch_max_time_to_wait_ms ,
677+ max_ack_time_ms = max_ack_time_ms ,
678+ max_msg_deliveries = max_msg_deliveries ,
679+ generate_random_suffix = generate_random_suffix ,
680+ start_consume_from_sequence = start_consume_from_sequence ,
681+ last_messages = last_messages ,
682+ )
673683 messages = await consumer .fetch (batch_size )
674684 if messages == None :
675685 messages = []
@@ -1023,7 +1033,6 @@ def default_error_handler(e):
10231033 print ("ping exception raised" , e )
10241034
10251035
1026-
10271036class Consumer :
10281037 def __init__ (
10291038 self ,
@@ -1061,7 +1070,6 @@ def __init__(
10611070 self .dls_callback_func = None
10621071 self .t_dls = asyncio .create_task (self .__consume_dls ())
10631072
1064-
10651073 def set_context (self , context ):
10661074 """Set a context (dict) that will be passed to each message handler call."""
10671075 self .context = context
@@ -1112,10 +1120,12 @@ async def __consume_dls(self):
11121120 )
11131121 async for msg in self .consumer_dls .messages :
11141122 index_to_insert = self .dls_current_index
1115- if index_to_insert >= 10000 :
1116- index_to_insert %= 10000
1117- self .dls_messages .insert (index_to_insert , Message (msg , self .connection , self .consumer_group ))
1118- self .dls_current_index += 1
1123+ if index_to_insert >= 10000 :
1124+ index_to_insert %= 10000
1125+ self .dls_messages .insert (
1126+ index_to_insert , Message (msg , self .connection , self .consumer_group )
1127+ )
1128+ self .dls_current_index += 1
11191129 if self .dls_callback_func != None :
11201130 await self .dls_callback_func (
11211131 [Message (msg , self .connection , self .consumer_group )],
@@ -1132,7 +1142,7 @@ async def fetch(self, batch_size: int = 10):
11321142 if self .connection .is_connection_active :
11331143 try :
11341144 self .batch_size = batch_size
1135- if len (self .dls_messages )> 0 :
1145+ if len (self .dls_messages ) > 0 :
11361146 if len (self .dls_messages ) <= batch_size :
11371147 messages = self .dls_messages
11381148 self .dls_messages = []
@@ -1151,7 +1161,7 @@ async def fetch(self, batch_size: int = 10):
11511161 subject = get_internal_name (self .station_name )
11521162 consumer_group = get_internal_name (self .consumer_group )
11531163 self .psub = await self .connection .broker_connection .pull_subscribe (
1154- subject + ".final" , durable = durableName
1164+ subject + ".final" , durable = durableName
11551165 )
11561166 msgs = await self .psub .fetch (batch_size )
11571167 for msg in msgs :
@@ -1217,9 +1227,8 @@ async def ack(self):
12171227 await self .message .ack ()
12181228 except Exception as e :
12191229 if (
1220- "$memphis_pm_id"
1221- in self .message .headers and "$memphis_pm_sequence"
1222- in self .message .headers
1230+ "$memphis_pm_id" in self .message .headers
1231+ and "$memphis_pm_sequence" in self .message .headers
12231232 ):
12241233 try :
12251234 msg = {
0 commit comments