From 10b917281c475e4dcbfa63fbe64c5dee84b9c4d4 Mon Sep 17 00:00:00 2001 From: jason810496 <810496@email.wlsh.tyc.edu.tw> Date: Sat, 3 Aug 2024 23:44:17 +0800 Subject: [PATCH 1/4] feat: add set_vt_method and usage docs --- pgmq_sqlalchemy/queue.py | 102 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index c1afb43..07e8615 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -810,6 +810,108 @@ def read_with_poll( queue_name, vt, qty, max_poll_seconds, poll_interval_ms ) + def _set_vt_sync( + self, queue_name: str, msg_id: int, vt_offset: int + ) -> Optional[Message]: + """Set the visibility timeout for a message.""" + with self.session_maker() as session: + row = session.execute( + text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"), + {"queue_name": queue_name, "msg_id": msg_id, "vt_offset": vt_offset}, + ).fetchone() + session.commit() + if row is None: + return None + return Message( + msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4] + ) + + async def _set_vt_async( + self, queue_name: str, msg_id: int, vt_offset: int + ) -> Optional[Message]: + """Set the visibility timeout for a message.""" + async with self.session_maker() as session: + row = ( + await session.execute( + text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"), + { + "queue_name": queue_name, + "msg_id": msg_id, + "vt_offset": vt_offset, + }, + ) + ).fetchone() + await session.commit() + print("row", row) + if row is None: + return None + return Message( + msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4] + ) + + def set_vt(self, queue_name: str, msg_id: int, vt_offset: int) -> Optional[Message]: + """ + .. _set_vt_method: ref:`pgmq_sqlalchemy.PGMQueue.set_vt` + .. |set_vt_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.set_vt` + + Set the visibility timeout for a message. + + Args: + queue_name (str): The name of the queue. + msg_id (int): The message id. + vt_offset (int): The visibility timeout in seconds. + + Returns: + |schema_message_class|_ or ``None`` if the message does not exist. + + Usage: + + .. code-block:: python + + msg_id = pgmq_client.send('my_queue', {'key': 'value'}, delay=10) + msg = pgmq_client.read('my_queue') + assert msg is not None + msg = pgmq_client.set_vt('my_queue', msg.msg_id, 10) + assert msg is not None + + .. tip:: + | |read_method|_ and |set_vt_method|_ can be used together to implement **exponential backoff** mechanism. + | `ref: Exponential Backoff And Jitter `_. + | **For example:** + + .. code-block:: python + + from pgmq_sqlalchemy import PGMQueue + from pgmq_sqlalchemy.schema import Message + + def _exp_backoff_retry(msg: Message)->int: + # exponential backoff retry + if msg.read_ct < 5: + return 2 ** msg.read_ct + return 2 ** 5 + + def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str): + msg = pgmq_client.read( + queue_name=queue_name, + vt=1000, # set vt to 1000 seconds temporarily + ) + if msg is None: + return + + # set exponential backoff retry + pgmq_client.set_vt( + queue_name=query_name, + msg_id=msg.msg_id, + vt_offset=_exp_backoff_retry(msg) + ) + + """ + if self.is_async: + return self.loop.run_until_complete( + self._set_vt_async(queue_name, msg_id, vt_offset) + ) + return self._set_vt_sync(queue_name, msg_id, vt_offset) + def _pop_sync(self, queue_name: str) -> Optional[Message]: with self.session_maker() as session: row = session.execute( From 81fcf87b8d90a647293e2f938926396d5995efab Mon Sep 17 00:00:00 2001 From: jason810496 <810496@email.wlsh.tyc.edu.tw> Date: Sat, 3 Aug 2024 23:50:21 +0800 Subject: [PATCH 2/4] test: add test for set_vt method --- tests/test_queue.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/test_queue.py b/tests/test_queue.py index fd53e38..6eb326f 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -242,6 +242,35 @@ def test_read_with_poll_with_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE): assert duration > 1.9 +def test_set_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE): + pgmq, queue_name = pgmq_setup_teardown + msg = MSG + msg_id = pgmq.send(queue_name, msg) + msg_read = pgmq.set_vt(queue_name, msg_id, 2) + assert msg is not None + assert pgmq.read(queue_name) is None + time.sleep(1.5) + assert pgmq.read(queue_name) is None + time.sleep(0.6) + msg_read = pgmq.read(queue_name) + assert msg_read.message == msg + + +def test_set_vt_to_smaller_value(pgmq_setup_teardown: PGMQ_WITH_QUEUE): + pgmq, queue_name = pgmq_setup_teardown + msg = MSG + msg_id = pgmq.send(queue_name, msg) + _ = pgmq.read(queue_name, vt=5) # set vt to 5 seconds + assert msg is not None + assert pgmq.read(queue_name) is None + time.sleep(0.5) + assert pgmq.set_vt(queue_name, msg_id, 1) is not None + time.sleep(0.3) + assert pgmq.read(queue_name) is None + time.sleep(0.8) + assert pgmq.read(queue_name) is not None + + def test_pop(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG From 8c19bb0ed0370d9965dea90f38f02b3d92619fef Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 1 Nov 2025 13:12:49 +0000 Subject: [PATCH 3/4] Initial plan From 552136c1b843e03b80b7d4e41e09ccc838087eed Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 1 Nov 2025 13:29:01 +0000 Subject: [PATCH 4/4] Fix bug: vt=0 should be valid visibility timeout value Added check for `vt is None` in the read() method to properly handle vt=0. Previously, vt=0 would be treated as falsy and would default to self.vt (30 seconds). Now vt=0 correctly sets the visibility timeout to 0 seconds as expected. Also added test case to verify vt=0 works correctly. Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/queue.py | 2 ++ tests/test_queue.py | 15 +++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index ea271a4..ad0770d 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -573,6 +573,8 @@ def read(self, queue_name: str, vt: Optional[int] = None) -> Optional[Message]: """ + if vt is None: + vt = self.vt if self.is_async: return self.loop.run_until_complete(self._read_async(queue_name, vt)) return self._read_sync(queue_name, vt) diff --git a/tests/test_queue.py b/tests/test_queue.py index 6eb326f..f01a494 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -162,6 +162,21 @@ def test_send_and_read_msg_with_vt_and_delay(pgmq_setup_teardown: PGMQ_WITH_QUEU assert msg_read.msg_id == msg_id +def test_send_and_read_msg_with_vt_zero(pgmq_setup_teardown: PGMQ_WITH_QUEUE): + """Test that vt=0 works correctly and message becomes visible immediately.""" + pgmq, queue_name = pgmq_setup_teardown + msg = MSG + msg_id: int = pgmq.send(queue_name, msg) + # Read with vt=0 means message should be immediately visible again + msg_read = pgmq.read(queue_name, vt=0) + assert msg_read.message == msg + assert msg_read.msg_id == msg_id + # Message should be visible immediately (no waiting) + msg_read = pgmq.read(queue_name) + assert msg_read.message == msg + assert msg_read.msg_id == msg_id + + def test_read_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg_read = pgmq.read(queue_name)