Skip to content

Commit d802c80

Browse files
authored
Merge pull request #8 from jason810496/feat/set-vt-method
Feat/set_vt
2 parents 48d4f6b + 81fcf87 commit d802c80

File tree

2 files changed

+131
-0
lines changed

2 files changed

+131
-0
lines changed

pgmq_sqlalchemy/queue.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,108 @@ def read_with_poll(
810810
queue_name, vt, qty, max_poll_seconds, poll_interval_ms
811811
)
812812

813+
def _set_vt_sync(
814+
self, queue_name: str, msg_id: int, vt_offset: int
815+
) -> Optional[Message]:
816+
"""Set the visibility timeout for a message."""
817+
with self.session_maker() as session:
818+
row = session.execute(
819+
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
820+
{"queue_name": queue_name, "msg_id": msg_id, "vt_offset": vt_offset},
821+
).fetchone()
822+
session.commit()
823+
if row is None:
824+
return None
825+
return Message(
826+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
827+
)
828+
829+
async def _set_vt_async(
830+
self, queue_name: str, msg_id: int, vt_offset: int
831+
) -> Optional[Message]:
832+
"""Set the visibility timeout for a message."""
833+
async with self.session_maker() as session:
834+
row = (
835+
await session.execute(
836+
text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"),
837+
{
838+
"queue_name": queue_name,
839+
"msg_id": msg_id,
840+
"vt_offset": vt_offset,
841+
},
842+
)
843+
).fetchone()
844+
await session.commit()
845+
print("row", row)
846+
if row is None:
847+
return None
848+
return Message(
849+
msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4]
850+
)
851+
852+
def set_vt(self, queue_name: str, msg_id: int, vt_offset: int) -> Optional[Message]:
853+
"""
854+
.. _set_vt_method: ref:`pgmq_sqlalchemy.PGMQueue.set_vt`
855+
.. |set_vt_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.set_vt`
856+
857+
Set the visibility timeout for a message.
858+
859+
Args:
860+
queue_name (str): The name of the queue.
861+
msg_id (int): The message id.
862+
vt_offset (int): The visibility timeout in seconds.
863+
864+
Returns:
865+
|schema_message_class|_ or ``None`` if the message does not exist.
866+
867+
Usage:
868+
869+
.. code-block:: python
870+
871+
msg_id = pgmq_client.send('my_queue', {'key': 'value'}, delay=10)
872+
msg = pgmq_client.read('my_queue')
873+
assert msg is not None
874+
msg = pgmq_client.set_vt('my_queue', msg.msg_id, 10)
875+
assert msg is not None
876+
877+
.. tip::
878+
| |read_method|_ and |set_vt_method|_ can be used together to implement **exponential backoff** mechanism.
879+
| `ref: Exponential Backoff And Jitter <https://aws.amazon.com/tw/blogs/architecture/exponential-backoff-and-jitter/>`_.
880+
| **For example:**
881+
882+
.. code-block:: python
883+
884+
from pgmq_sqlalchemy import PGMQueue
885+
from pgmq_sqlalchemy.schema import Message
886+
887+
def _exp_backoff_retry(msg: Message)->int:
888+
# exponential backoff retry
889+
if msg.read_ct < 5:
890+
return 2 ** msg.read_ct
891+
return 2 ** 5
892+
893+
def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str):
894+
msg = pgmq_client.read(
895+
queue_name=queue_name,
896+
vt=1000, # set vt to 1000 seconds temporarily
897+
)
898+
if msg is None:
899+
return
900+
901+
# set exponential backoff retry
902+
pgmq_client.set_vt(
903+
queue_name=query_name,
904+
msg_id=msg.msg_id,
905+
vt_offset=_exp_backoff_retry(msg)
906+
)
907+
908+
"""
909+
if self.is_async:
910+
return self.loop.run_until_complete(
911+
self._set_vt_async(queue_name, msg_id, vt_offset)
912+
)
913+
return self._set_vt_sync(queue_name, msg_id, vt_offset)
914+
813915
def _pop_sync(self, queue_name: str) -> Optional[Message]:
814916
with self.session_maker() as session:
815917
row = session.execute(

tests/test_queue.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,35 @@ def test_read_with_poll_with_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
242242
assert duration > 1.9
243243

244244

245+
def test_set_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
246+
pgmq, queue_name = pgmq_setup_teardown
247+
msg = MSG
248+
msg_id = pgmq.send(queue_name, msg)
249+
msg_read = pgmq.set_vt(queue_name, msg_id, 2)
250+
assert msg is not None
251+
assert pgmq.read(queue_name) is None
252+
time.sleep(1.5)
253+
assert pgmq.read(queue_name) is None
254+
time.sleep(0.6)
255+
msg_read = pgmq.read(queue_name)
256+
assert msg_read.message == msg
257+
258+
259+
def test_set_vt_to_smaller_value(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
260+
pgmq, queue_name = pgmq_setup_teardown
261+
msg = MSG
262+
msg_id = pgmq.send(queue_name, msg)
263+
_ = pgmq.read(queue_name, vt=5) # set vt to 5 seconds
264+
assert msg is not None
265+
assert pgmq.read(queue_name) is None
266+
time.sleep(0.5)
267+
assert pgmq.set_vt(queue_name, msg_id, 1) is not None
268+
time.sleep(0.3)
269+
assert pgmq.read(queue_name) is None
270+
time.sleep(0.8)
271+
assert pgmq.read(queue_name) is not None
272+
273+
245274
def test_pop(pgmq_setup_teardown: PGMQ_WITH_QUEUE):
246275
pgmq, queue_name = pgmq_setup_teardown
247276
msg = MSG

0 commit comments

Comments
 (0)