Skip to content

Commit 10b9172

Browse files
committed
feat: add set_vt_method and usage docs
1 parent d7457a2 commit 10b9172

File tree

1 file changed

+102
-0
lines changed

1 file changed

+102
-0
lines changed

pgmq_sqlalchemy/queue.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,108 @@ def read_with_poll(
810810
queue_name, vt, qty, max_poll_seconds, poll_interval_ms
811811
)
812812

813+
def _set_vt_sync(
814+
self, queue_name: str, msg_id: int, vt_offset: int
815+
) -> Optional[Message]:
816+
"""Set the visibility timeout for a message."""
817+
with self.session_maker() as session:
818+
row = session.execute(
819+
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
820+
{"queue_name": queue_name, "msg_id": msg_id, "vt_offset": vt_offset},
821+
).fetchone()
822+
session.commit()
823+
if row is None:
824+
return None
825+
return Message(
826+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
827+
)
828+
829+
async def _set_vt_async(
830+
self, queue_name: str, msg_id: int, vt_offset: int
831+
) -> Optional[Message]:
832+
"""Set the visibility timeout for a message."""
833+
async with self.session_maker() as session:
834+
row = (
835+
await session.execute(
836+
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
837+
{
838+
"queue_name": queue_name,
839+
"msg_id": msg_id,
840+
"vt_offset": vt_offset,
841+
},
842+
)
843+
).fetchone()
844+
await session.commit()
845+
print("row", row)
846+
if row is None:
847+
return None
848+
return Message(
849+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
850+
)
851+
852+
def set_vt(self, queue_name: str, msg_id: int, vt_offset: int) -> Optional[Message]:
853+
"""
854+
.. _set_vt_method: ref:`pgmq_sqlalchemy.PGMQueue.set_vt`
855+
.. |set_vt_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.set_vt`
856+
857+
Set the visibility timeout for a message.
858+
859+
Args:
860+
queue_name (str): The name of the queue.
861+
msg_id (int): The message id.
862+
vt_offset (int): The visibility timeout in seconds.
863+
864+
Returns:
865+
|schema_message_class|_ or ``None`` if the message does not exist.
866+
867+
Usage:
868+
869+
.. code-block:: python
870+
871+
msg_id = pgmq_client.send('my_queue', {'key': 'value'}, delay=10)
872+
msg = pgmq_client.read('my_queue')
873+
assert msg is not None
874+
msg = pgmq_client.set_vt('my_queue', msg.msg_id, 10)
875+
assert msg is not None
876+
877+
.. tip::
878+
| |read_method|_ and |set_vt_method|_ can be used together to implement **exponential backoff** mechanism.
879+
| `ref: Exponential Backoff And Jitter <https://aws.amazon.com/tw/blogs/architecture/exponential-backoff-and-jitter/>`_.
880+
| **For example:**
881+
882+
.. code-block:: python
883+
884+
from pgmq_sqlalchemy import PGMQueue
885+
from pgmq_sqlalchemy.schema import Message
886+
887+
def _exp_backoff_retry(msg: Message)->int:
888+
# exponential backoff retry
889+
if msg.read_ct < 5:
890+
return 2 ** msg.read_ct
891+
return 2 ** 5
892+
893+
def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str):
894+
msg = pgmq_client.read(
895+
queue_name=queue_name,
896+
vt=1000, # set vt to 1000 seconds temporarily
897+
)
898+
if msg is None:
899+
return
900+
901+
# set exponential backoff retry
902+
pgmq_client.set_vt(
903+
queue_name=query_name,
904+
msg_id=msg.msg_id,
905+
vt_offset=_exp_backoff_retry(msg)
906+
)
907+
908+
"""
909+
if self.is_async:
910+
return self.loop.run_until_complete(
911+
self._set_vt_async(queue_name, msg_id, vt_offset)
912+
)
913+
return self._set_vt_sync(queue_name, msg_id, vt_offset)
914+
813915
def _pop_sync(self, queue_name: str) -> Optional[Message]:
814916
with self.session_maker() as session:
815917
row = session.execute(

0 commit comments

Comments
 (0)