@@ -146,7 +146,8 @@ def disconnect(self) -> None:
146146 self ._connection .close ()
147147
148148 if self ._thread :
149- self ._thread .join ()
149+ # If gracefully shutting down, we should finish up the current job.
150+ self ._thread .join (5 if self .considered_unrecoverable () else None )
150151 self ._thread = None
151152
152153 def is_connected (self ) -> bool :
@@ -163,7 +164,7 @@ def is_connected(self) -> bool:
163164 def considered_unrecoverable (self ) -> bool :
164165 return self ._unrecoverable
165166
166- def publish (self , topic : str , payload : bytes , persist : bool ) -> None : # noqa: ARG002 (TODO handle persistence)
167+ def publish (self , topic : str , payload : bytes , persist : bool ) -> None :
167168 """Publish the given message.
168169
169170 Publish payload with the pre-existing connection (via connect()) on topic.
@@ -180,9 +181,9 @@ def publish(self, topic: str, payload: bytes, persist: bool) -> None: # noqa: A
180181 body = payload ,
181182 properties = pika .BasicProperties (
182183 content_type = 'text/plain' ,
183- # delivery_mode=pika.delivery_mode.DeliveryMode.Persistent
184- # if persist
185- # else pika.delivery_mode.DeliveryMode.Transient,
184+ delivery_mode = pika .delivery_mode .DeliveryMode .Persistent
185+ if persist
186+ else pika .delivery_mode .DeliveryMode .Transient ,
186187 # expiration=None if persist else '8640000',
187188 ),
188189 )
@@ -367,7 +368,9 @@ def _create_queue(self, channel: Channel, topic: str, persist: bool) -> None:
367368 If True, this queue will persist forever, even on application or broker shutdown, and we need a persistent name.
368369 If False, we will generate a temporary queue using the broker's naming scheme.
369370 """
370- cb = functools .partial (self ._on_queue_declareok , channel = channel , topic = topic )
371+ cb = functools .partial (
372+ self ._on_queue_declareok , channel = channel , topic = topic , persist = persist
373+ )
371374 channel .queue_declare (
372375 queue = _get_queue_name (topic )
373376 if persist
@@ -377,7 +380,9 @@ def _create_queue(self, channel: Channel, topic: str, persist: bool) -> None:
377380 callback = cb ,
378381 )
379382
380- def _on_queue_declareok (self , frame : Frame , channel : Channel , topic : str ) -> None :
383+ def _on_queue_declareok (
384+ self , frame : Frame , channel : Channel , topic : str , persist : bool
385+ ) -> None :
381386 """Begins listening on the given queue.
382387
383388 Used as a listener on queue declaration.
@@ -386,10 +391,15 @@ def _on_queue_declareok(self, frame: Frame, channel: Channel, topic: str) -> Non
386391 frame: Response from the queue declare we sent to the AMQP broker. We get the queue name from this.
387392 channel: The Channel being instantiated.
388393 topic: The string name for the Channel on the broker.
394+ persist: Whether or not our queue should persist on either broker or application shutdown.
389395 """
390396 queue_name = frame .method .queue
391397 cb = functools .partial (
392- self ._on_queue_bindok , channel = channel , topic = topic , queue_name = queue_name
398+ self ._on_queue_bindok ,
399+ channel = channel ,
400+ topic = topic ,
401+ queue_name = queue_name ,
402+ persist = persist ,
393403 )
394404 channel .queue_bind (
395405 queue = queue_name ,
@@ -399,7 +409,12 @@ def _on_queue_declareok(self, frame: Frame, channel: Channel, topic: str) -> Non
399409 )
400410
401411 def _on_queue_bindok (
402- self , _unused_frame : Frame , channel : Channel , topic : str , queue_name : str
412+ self ,
413+ _unused_frame : Frame ,
414+ channel : Channel ,
415+ topic : str ,
416+ queue_name : str ,
417+ persist : bool ,
403418 ) -> None :
404419 """Consumes a message from the given channel.
405420
@@ -410,12 +425,14 @@ def _on_queue_bindok(
410425 channel: The Channel being instantiated.
411426 topic: Name of the topic on the broker.
412427 queue_name: The name of the queue on the AMQP broker.
428+ persist: Whether or not our queue should persist on either broker or application shutdown.
413429 """
414430 cb = functools .partial (self ._on_consume_ok , topic = topic )
431+ message_cb = functools .partial (self ._consume_message , persist = persist )
415432 consumer_tag = channel .basic_consume (
416433 queue = queue_name ,
417- auto_ack = True ,
418- on_message_callback = self . _consume_message ,
434+ auto_ack = not persist , # persistent messages should be manually acked and we have no reason to NACK a message for now
435+ on_message_callback = message_cb ,
419436 callback = cb ,
420437 )
421438 self ._topics_to_consumer_tags [topic ] = consumer_tag
@@ -433,23 +450,32 @@ def _on_consume_ok(self, _unused_frame: Frame, topic: str) -> None:
433450
434451 def _consume_message (
435452 self ,
436- _unused_channel : Channel ,
453+ channel : Channel ,
437454 basic_deliver : Basic .Deliver ,
438455 _properties : BasicProperties ,
439456 body : bytes ,
457+ persist : bool ,
440458 ) -> None :
441- """Handles incoming messages.
459+ """Handles incoming messages and acknowledges them ONLY after code executes on the domain side .
442460
443461 Looks up all handlers for the topic and delegates message handling to them.
462+ The handlers comprise the Service/Client logic, which includes all domain science logic.
444463
445464 Args:
446- _unused_channel : The AMQP channel the message was received on. Ignored
465+ channel : The AMQP channel the message was received on. Used to manually acknowledge messages.
447466 basic_deliver: Contains internal AMQP delivery information - i.e. the routing key.
448467 _properties: Object from the AMQP call. Ignored.
449468 body: the AMQP message to be handled.
469+ persist: Whether or not our queue should persist on either broker or application shutdown.
450470 """
451471 tth_key = _amqp_2_hierarchy (basic_deliver .routing_key )
452472 topic_handler = self ._topics_to_handlers ().get (tth_key )
453473 if topic_handler :
454474 for cb in topic_handler .callbacks :
455475 cb (body )
476+ # With persistent messages, we only acknowledge the message AFTER we are done processing
477+ # (this removes the message from the broker queue)
478+ # this allows us to retry a message if the broker OR this application goes down
479+ # We currently never NACK or reject a message because in INTERSECT, applications currently never "share" a queue.
480+ if persist :
481+ channel .basic_ack (basic_deliver .delivery_tag )
0 commit comments